From 018e7d3f9c4bcacce716dd607994486a31ee71bb Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 1 Jun 2023 14:45:40 -0700 Subject: [PATCH] GH-35730: [C++] Add the ability to specify custom schema on a dataset write (#35860) ### Rationale for this change The dataset write node previously allowed you to specify custom key/value metadata on a write node. This was added to support saving schema metadata. However, it doesn't capture field metadata or field nullability. This PR replaces that capability with the ability to specify a custom schema instead. The custom schema must have the same number of fields as the input to the write node and each field must have the same type. ### What changes are included in this PR? Added `custom_schema` to `WriteNodeOptions` and removed `custom_metadata`. ### Are these changes tested? Yes, I added a new C++ unit test to verify that the custom info is applied to written files. ### Are there any user-facing changes? No. Only new functionality (which is user facing) * Closes: #35730 Lead-authored-by: Weston Pace Co-authored-by: Nic Crane Co-authored-by: Joris Van den Bossche Co-authored-by: anjakefala Co-authored-by: Antoine Pitrou Signed-off-by: Weston Pace --- cpp/src/arrow/dataset/CMakeLists.txt | 1 + cpp/src/arrow/dataset/file_base.cc | 53 +++++-- cpp/src/arrow/dataset/file_base.h | 10 ++ cpp/src/arrow/dataset/write_node_test.cc | 174 +++++++++++++++++++++++ python/pyarrow/tests/test_dataset.py | 53 +++++++ r/R/arrowExports.R | 4 +- r/R/query-engine.R | 4 +- r/R/schema.R | 2 +- r/src/arrowExports.cpp | 10 +- r/src/compute-exec.cpp | 28 ++-- 10 files changed, 310 insertions(+), 29 deletions(-) create mode 100644 cpp/src/arrow/dataset/write_node_test.cc diff --git a/cpp/src/arrow/dataset/CMakeLists.txt b/cpp/src/arrow/dataset/CMakeLists.txt index 7bffdbf08c1e6..e3221d8283d27 100644 --- a/cpp/src/arrow/dataset/CMakeLists.txt +++ b/cpp/src/arrow/dataset/CMakeLists.txt @@ -151,6 +151,7 @@ add_arrow_dataset_test(file_test) add_arrow_dataset_test(partition_test) add_arrow_dataset_test(scanner_test) add_arrow_dataset_test(subtree_test) +add_arrow_dataset_test(write_node_test) if(ARROW_CSV) add_arrow_dataset_test(file_csv_test) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index b300bd67cee8e..2fcd57d2f3622 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -387,16 +387,16 @@ Status WriteBatch( class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer { public: - DatasetWritingSinkNodeConsumer(std::shared_ptr custom_metadata, + DatasetWritingSinkNodeConsumer(std::shared_ptr custom_schema, FileSystemDatasetWriteOptions write_options) - : custom_metadata_(std::move(custom_metadata)), + : custom_schema_(std::move(custom_schema)), write_options_(std::move(write_options)) {} Status Init(const std::shared_ptr& schema, acero::BackpressureControl* backpressure_control, acero::ExecPlan* plan) override { - if (custom_metadata_) { - schema_ = schema->WithMetadata(custom_metadata_); + if (custom_schema_) { + schema_ = custom_schema_; } else { schema_ = schema; } @@ -434,7 +434,7 @@ class DatasetWritingSinkNodeConsumer : public acero::SinkNodeConsumer { }); } - std::shared_ptr custom_metadata_; + std::shared_ptr custom_schema_; std::unique_ptr dataset_writer_; FileSystemDatasetWriteOptions write_options_; Future<> finished_ = Future<>::Make(); @@ -453,13 +453,16 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio // The projected_schema is currently used by pyarrow to preserve the custom metadata // when reading from a single input file. - const auto& custom_metadata = scanner->options()->projected_schema->metadata(); + const auto& custom_schema = scanner->options()->projected_schema; + + WriteNodeOptions write_node_options(write_options); + write_node_options.custom_schema = custom_schema; acero::Declaration plan = acero::Declaration::Sequence({ {"scan", ScanNodeOptions{dataset, scanner->options()}}, {"filter", acero::FilterNodeOptions{scanner->options()->filter}}, {"project", acero::ProjectNodeOptions{std::move(exprs), std::move(names)}}, - {"write", WriteNodeOptions{write_options, custom_metadata}}, + {"write", std::move(write_node_options)}, }); return acero::DeclarationToStatus(std::move(plan), scanner->options()->use_threads); @@ -475,16 +478,50 @@ Result MakeWriteNode(acero::ExecPlan* plan, const WriteNodeOptions write_node_options = checked_cast(options); + std::shared_ptr custom_schema = write_node_options.custom_schema; const std::shared_ptr& custom_metadata = write_node_options.custom_metadata; const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options; + const std::shared_ptr& input_schema = inputs[0]->output_schema(); + + if (custom_schema != nullptr) { + if (custom_metadata) { + return Status::TypeError( + "Do not provide both custom_metadata and custom_schema. If custom_schema is " + "used then custom_schema->metadata should be used instead of custom_metadata"); + } + + if (custom_schema->num_fields() != input_schema->num_fields()) { + return Status::TypeError( + "The provided custom_schema did not have the same number of fields as the " + "data. The custom schema can only be used to add metadata / nullability to " + "fields and cannot change the type or number of fields."); + } + for (int field_idx = 0; field_idx < input_schema->num_fields(); field_idx++) { + if (!input_schema->field(field_idx)->type()->Equals( + custom_schema->field(field_idx)->type())) { + return Status::TypeError("The provided custom_schema specified type ", + custom_schema->field(field_idx)->type()->ToString(), + " for field ", field_idx, "and the input data has type ", + input_schema->field(field_idx), + "The custom schema can only be used to add metadata / " + "nullability to fields and " + "cannot change the type or number of fields."); + } + } + } + + if (custom_metadata) { + custom_schema = input_schema->WithMetadata(custom_metadata); + } + if (!write_options.partitioning) { return Status::Invalid("Must provide partitioning"); } std::shared_ptr consumer = - std::make_shared(custom_metadata, write_options); + std::make_shared(custom_schema, write_options); ARROW_ASSIGN_OR_RAISE( auto node, diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 788a1bb432e74..d33d88e9966fe 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -33,6 +33,7 @@ #include "arrow/dataset/visibility.h" #include "arrow/filesystem/filesystem.h" #include "arrow/io/file.h" +#include "arrow/type_fwd.h" #include "arrow/util/compression.h" namespace arrow { @@ -470,6 +471,15 @@ class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions { /// \brief Options to control how to write the dataset FileSystemDatasetWriteOptions write_options; + /// \brief Optional schema to attach to all written batches + /// + /// By default, we will use the output schema of the input. + /// + /// This can be used to alter schema metadata, field nullability, or field metadata. + /// However, this cannot be used to change the type of data. If the custom schema does + /// not have the same number of fields and the same data types as the input then the + /// plan will fail. + std::shared_ptr custom_schema; /// \brief Optional metadata to attach to written batches std::shared_ptr custom_metadata; }; diff --git a/cpp/src/arrow/dataset/write_node_test.cc b/cpp/src/arrow/dataset/write_node_test.cc new file mode 100644 index 0000000000000..f420bd2e6ca67 --- /dev/null +++ b/cpp/src/arrow/dataset/write_node_test.cc @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include + +#include "arrow/acero/exec_plan.h" +#include "arrow/acero/options.h" +#include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_ipc.h" +#include "arrow/dataset/partition.h" +#include "arrow/dataset/plan.h" +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/io/interfaces.h" +#include "arrow/ipc/reader.h" +#include "arrow/testing/generator.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" + +#include "arrow/table.h" +#include "arrow/util/key_value_metadata.h" + +namespace arrow { + +namespace dataset { + +class SimpleWriteNodeTest : public ::testing::Test { + protected: + void SetUp() override { + internal::Initialize(); + mock_fs_ = std::make_shared(fs::kNoTime); + auto ipc_format = std::make_shared(); + + fs_write_options_.filesystem = mock_fs_; + fs_write_options_.base_dir = "/my_dataset"; + fs_write_options_.basename_template = "{i}.arrow"; + fs_write_options_.file_write_options = ipc_format->DefaultWriteOptions(); + fs_write_options_.partitioning = dataset::Partitioning::Default(); + } + + std::shared_ptr mock_fs_; + dataset::FileSystemDatasetWriteOptions fs_write_options_; +}; + +TEST_F(SimpleWriteNodeTest, CustomNullability) { + // Create an input table with a nullable and a non-nullable type + ExecBatch batch = gen::Gen({gen::Step()})->FailOnError()->ExecBatch(/*num_rows=*/1); + std::shared_ptr test_schema = + schema({field("nullable_i32", uint32(), /*nullable=*/true), + field("non_nullable_i32", uint32(), /*nullable=*/false)}); + std::shared_ptr record_batch = + RecordBatch::Make(test_schema, /*num_rows=*/1, + {batch.values[0].make_array(), batch.values[0].make_array()}); + ASSERT_OK_AND_ASSIGN(std::shared_ptr table, + Table::FromRecordBatches({std::move(record_batch)})); + + ASSERT_TRUE(table->field(0)->nullable()); + ASSERT_FALSE(table->field(1)->nullable()); + + dataset::WriteNodeOptions write_options(fs_write_options_); + write_options.custom_schema = test_schema; + + // Write the data to disk (these plans use a project because it destroys whatever + // metadata happened to be in the table source node's output schema). This more + // accurately simulates reading from a dataset. + acero::Declaration plan = acero::Declaration::Sequence( + {{"table_source", acero::TableSourceNodeOptions(table)}, + {"project", + acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})}, + {"write", write_options}}); + + ASSERT_OK(DeclarationToStatus(plan)); + + // Read the file back out and verify the nullability + ASSERT_OK_AND_ASSIGN(std::shared_ptr file, + mock_fs_->OpenInputFile("/my_dataset/0.arrow")); + ASSERT_OK_AND_ASSIGN(std::shared_ptr file_reader, + ipc::RecordBatchFileReader::Open(file)); + std::shared_ptr file_schema = file_reader->schema(); + + ASSERT_TRUE(file_schema->field(0)->nullable()); + ASSERT_FALSE(file_schema->field(1)->nullable()); + + // Invalid custom schema + + // Incorrect # of fields + write_options.custom_schema = schema({}); + plan = acero::Declaration::Sequence( + {{"table_source", acero::TableSourceNodeOptions(table)}, + {"project", + acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})}, + {"write", write_options}}); + + ASSERT_THAT( + DeclarationToStatus(plan), + Raises(StatusCode::TypeError, + ::testing::HasSubstr("did not have the same number of fields as the data"))); + + // Incorrect types + write_options.custom_schema = + schema({field("nullable_i32", int32()), field("non_nullable_i32", int32())}); + plan = acero::Declaration::Sequence( + {{"table_source", acero::TableSourceNodeOptions(table)}, + {"project", + acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})}, + {"write", write_options}}); + ASSERT_THAT( + DeclarationToStatus(plan), + Raises(StatusCode::TypeError, ::testing::HasSubstr("and the input data has type"))); + + // Cannot have both custom_schema and custom_metadata + write_options.custom_schema = test_schema; + write_options.custom_metadata = key_value_metadata({{"foo", "bar"}}); + plan = acero::Declaration::Sequence( + {{"table_source", acero::TableSourceNodeOptions(std::move(table))}, + {"project", + acero::ProjectNodeOptions({compute::field_ref(0), compute::field_ref(1)})}, + {"write", write_options}}); + ASSERT_THAT(DeclarationToStatus(plan), + Raises(StatusCode::TypeError, + ::testing::HasSubstr( + "Do not provide both custom_metadata and custom_schema"))); +} + +TEST_F(SimpleWriteNodeTest, CustomMetadata) { + constexpr int64_t kRowsPerChunk = 1; + constexpr int64_t kNumChunks = 1; + // Create an input table with no schema metadata + std::shared_ptr
table = + gen::Gen({gen::Step()})->FailOnError()->Table(kRowsPerChunk, kNumChunks); + + std::shared_ptr custom_metadata = + key_value_metadata({{"foo", "bar"}}); + + dataset::WriteNodeOptions write_options(fs_write_options_); + write_options.custom_metadata = custom_metadata; + + // Write the data to disk + acero::Declaration plan = acero::Declaration::Sequence( + {{"table_source", acero::TableSourceNodeOptions(table)}, + {"project", acero::ProjectNodeOptions({compute::field_ref(0)})}, + {"write", write_options}}); + + ASSERT_OK(DeclarationToStatus(plan)); + + // Read the file back out and verify the schema metadata + ASSERT_OK_AND_ASSIGN(std::shared_ptr file, + mock_fs_->OpenInputFile("/my_dataset/0.arrow")); + ASSERT_OK_AND_ASSIGN(std::shared_ptr file_reader, + ipc::RecordBatchFileReader::Open(file)); + std::shared_ptr file_schema = file_reader->schema(); + + ASSERT_TRUE(custom_metadata->Equals(*file_schema->metadata())); +} + +} // namespace dataset +} // namespace arrow diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 66562b76c96b0..144da21cf5e6b 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -5172,6 +5172,59 @@ def test_dataset_partition_with_slash(tmpdir): assert encoded_paths == file_paths +@pytest.mark.parquet +def test_write_dataset_preserve_nullability(tempdir): + # GH-35730 + schema_nullable = pa.schema([ + pa.field("x", pa.int64(), nullable=False), + pa.field("y", pa.int64(), nullable=True)]) + + arrays = [[1, 2, 3], [None, 5, None]] + table = pa.Table.from_arrays(arrays, schema=schema_nullable) + + pq.write_to_dataset(table, tempdir / "nulltest1") + dataset = ds.dataset(tempdir / "nulltest1", format="parquet") + # nullability of field is preserved + assert dataset.to_table().schema.equals(schema_nullable) + + ds.write_dataset(table, tempdir / "nulltest2", format="parquet") + dataset = ds.dataset(tempdir / "nulltest2", format="parquet") + assert dataset.to_table().schema.equals(schema_nullable) + + ds.write_dataset([table, table], tempdir / "nulltest3", format="parquet") + dataset = ds.dataset(tempdir / "nulltest3", format="parquet") + assert dataset.to_table().schema.equals(schema_nullable) + + +def test_write_dataset_preserve_field_metadata(tempdir): + schema_metadata = pa.schema([ + pa.field("x", pa.int64(), metadata={b'foo': b'bar'}), + pa.field("y", pa.int64())]) + + schema_no_meta = pa.schema([ + pa.field("x", pa.int64()), + pa.field("y", pa.int64())]) + + arrays = [[1, 2, 3], [None, 5, None]] + table = pa.Table.from_arrays(arrays, schema=schema_metadata) + table_no_meta = pa.Table.from_arrays(arrays, schema=schema_no_meta) + + # If no schema is provided the schema of the first table will be used + ds.write_dataset([table, table_no_meta], tempdir / "test1", format="parquet") + dataset = ds.dataset(tempdir / "test1", format="parquet") + assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True) + + ds.write_dataset([table_no_meta, table], tempdir / "test2", format="parquet") + dataset = ds.dataset(tempdir / "test2", format="parquet") + assert dataset.to_table().schema.equals(schema_no_meta, check_metadata=True) + + # If a schema is provided it will override the schema of the input + ds.write_dataset([table_no_meta, table], tempdir / "test3", format="parquet", + schema=schema_metadata) + dataset = ds.dataset(tempdir / "test3", format="parquet") + assert dataset.to_table().schema.equals(schema_metadata, check_metadata=True) + + @pytest.mark.parametrize('dstype', [ "fs", "mem" ]) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index b494623eedc41..8a76e678ba21e 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -468,8 +468,8 @@ ExecNode_Scan <- function(plan, dataset, filter, projection) { .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection) } -ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { - invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) +ExecPlan_Write <- function(plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { + invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) } ExecNode_Filter <- function(input, filter) { diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 79227546dd3ed..0f8a84f9b867e 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -236,10 +236,12 @@ ExecPlan <- R6Class("ExecPlan", }, Write = function(node, ...) { # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ... + final_metadata <- prepare_key_value_metadata(node$final_metadata()) + ExecPlan_Write( self, node, - prepare_key_value_metadata(node$final_metadata()), + node$schema$WithMetadata(final_metadata), ... ) }, diff --git a/r/R/schema.R b/r/R/schema.R index c8316854c2802..9ff38487c4b6f 100644 --- a/r/R/schema.R +++ b/r/R/schema.R @@ -229,7 +229,7 @@ prepare_key_value_metadata <- function(metadata) { call. = FALSE ) } - if (is.list(metadata[["r"]])) { + if (!is_empty(metadata) && is.list(metadata[["r"]])) { metadata[["r"]] <- .serialize_arrow_r_metadata(metadata[["r"]]) } map_chr(metadata, as.character) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 80877e827d1b2..ca4a4be38d060 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1084,12 +1084,12 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil // compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) -void ExecPlan_Write(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::strings metadata, const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); -extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ +void ExecPlan_Write(const std::shared_ptr& plan, const std::shared_ptr& final_node, const std::shared_ptr& schema, const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type final_node(final_node_sexp); - arrow::r::Input::type metadata(metadata_sexp); + arrow::r::Input&>::type schema(schema_sexp); arrow::r::Input&>::type file_write_options(file_write_options_sexp); arrow::r::Input&>::type filesystem(filesystem_sexp); arrow::r::Input::type base_dir(base_dir_sexp); @@ -1101,12 +1101,12 @@ BEGIN_CPP11 arrow::r::Input::type max_rows_per_file(max_rows_per_file_sexp); arrow::r::Input::type min_rows_per_group(min_rows_per_group_sexp); arrow::r::Input::type max_rows_per_group(max_rows_per_group_sexp); - ExecPlan_Write(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); + ExecPlan_Write(plan, final_node, schema, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 0f269bed11261..e0b3c62c47d7f 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -307,15 +307,18 @@ std::shared_ptr ExecNode_Scan( } // [[dataset::export]] -void ExecPlan_Write( - const std::shared_ptr& plan, - const std::shared_ptr& final_node, cpp11::strings metadata, - const std::shared_ptr& file_write_options, - const std::shared_ptr& filesystem, std::string base_dir, - const std::shared_ptr& partitioning, std::string basename_template, - arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, - uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, - uint64_t max_rows_per_group) { +void ExecPlan_Write(const std::shared_ptr& plan, + const std::shared_ptr& final_node, + const std::shared_ptr& schema, + const std::shared_ptr& file_write_options, + const std::shared_ptr& filesystem, + std::string base_dir, + const std::shared_ptr& partitioning, + std::string basename_template, + arrow::dataset::ExistingDataBehavior existing_data_behavior, + int max_partitions, uint32_t max_open_files, + uint64_t max_rows_per_file, uint64_t min_rows_per_group, + uint64_t max_rows_per_group) { arrow::dataset::internal::Initialize(); // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R @@ -333,9 +336,10 @@ void ExecPlan_Write( opts.min_rows_per_group = min_rows_per_group; opts.max_rows_per_group = max_rows_per_group; - auto kv = strings_to_kvm(metadata); - MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, - ds::WriteNodeOptions{std::move(opts), std::move(kv)}); + ds::WriteNodeOptions options(std::move(opts)); + options.custom_schema = std::move(schema); + + MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, std::move(options)); StopIfNotOk(plan->Validate());