diff --git a/c_glib/arrow-glib/compute.cpp b/c_glib/arrow-glib/compute.cpp index ca5bc1a76f54d..193ba2837b906 100644 --- a/c_glib/arrow-glib/compute.cpp +++ b/c_glib/arrow-glib/compute.cpp @@ -1254,9 +1254,7 @@ garrow_aggregate_node_options_new(GList *aggregations, gsize n_keys, GError **error) { - std::vector arrow_aggregates; - std::vector arrow_targets; - std::vector arrow_names; + std::vector arrow_aggregates; std::vector arrow_keys; for (auto node = aggregations; node; node = node->next) { auto aggregation_priv = GARROW_AGGREGATION_GET_PRIVATE(node->data); @@ -1265,21 +1263,19 @@ garrow_aggregate_node_options_new(GList *aggregations, function_options = garrow_function_options_get_raw(aggregation_priv->options); }; - if (function_options) { - arrow_aggregates.push_back({ - aggregation_priv->function, - function_options->Copy(), - }); - } else { - arrow_aggregates.push_back({aggregation_priv->function, nullptr}); - }; + std::vector arrow_targets; if (!garrow_field_refs_add(arrow_targets, aggregation_priv->input, error, "[aggregate-node-options][new][input]")) { return NULL; } - arrow_names.emplace_back(aggregation_priv->output); + arrow_aggregates.push_back({ + aggregation_priv->function, + function_options ? function_options->Copy() : nullptr, + arrow_targets[0], + aggregation_priv->output, + }); } for (gsize i = 0; i < n_keys; ++i) { if (!garrow_field_refs_add(arrow_keys, @@ -1291,8 +1287,6 @@ garrow_aggregate_node_options_new(GList *aggregations, } auto arrow_options = new arrow::compute::AggregateNodeOptions(std::move(arrow_aggregates), - std::move(arrow_targets), - std::move(arrow_names), std::move(arrow_keys)); auto options = g_object_new(GARROW_TYPE_AGGREGATE_NODE_OPTIONS, "options", arrow_options, diff --git a/c_glib/arrow-glib/compute.h b/c_glib/arrow-glib/compute.h index 818b76ed72c85..32db15be8b055 100644 --- a/c_glib/arrow-glib/compute.h +++ b/c_glib/arrow-glib/compute.h @@ -755,7 +755,7 @@ garrow_quantile_options_get_qs(GArrowQuantileOptions *options, GARROW_AVAILABLE_IN_9_0 void garrow_quantile_options_set_q(GArrowQuantileOptions *options, - gdouble quantile); + gdouble q); GARROW_AVAILABLE_IN_9_0 void garrow_quantile_options_set_qs(GArrowQuantileOptions *options, diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc index 5f80119bbbac8..331388a529134 100644 --- a/cpp/examples/arrow/execution_plan_documentation_examples.cc +++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc @@ -502,9 +502,8 @@ arrow::Status SourceScalarAggregateSinkExample(cp::ExecContext& exec_context) { ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source, cp::MakeExecNode("source", plan.get(), {}, source_node_options)); - auto aggregate_options = cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr}}, - /*targets=*/{"a"}, - /*names=*/{"sum(a)"}}; + auto aggregate_options = + cp::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}}; ARROW_ASSIGN_OR_RAISE( cp::ExecNode * aggregate, cp::MakeExecNode("aggregate", plan.get(), {source}, std::move(aggregate_options))); @@ -541,9 +540,7 @@ arrow::Status SourceGroupAggregateSinkExample(cp::ExecContext& exec_context) { cp::MakeExecNode("source", plan.get(), {}, source_node_options)); auto options = std::make_shared(cp::CountOptions::ONLY_VALID); auto aggregate_options = - cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options}}, - /*targets=*/{"a"}, - /*names=*/{"count(a)"}, + cp::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}}, /*keys=*/{"b"}}; ARROW_ASSIGN_OR_RAISE( cp::ExecNode * aggregate, diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index becd5a7414960..55f434c6659b0 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -393,8 +393,6 @@ ARROW_EXPORT Result Index(const Datum& value, const IndexOptions& options, ExecContext* ctx = NULLPTR); -namespace internal { - /// \brief Configure a grouped aggregation struct ARROW_EXPORT Aggregate { /// the name of the aggregation function @@ -402,8 +400,13 @@ struct ARROW_EXPORT Aggregate { /// options for the aggregation function std::shared_ptr options; + + // fields to which aggregations will be applied + FieldRef target; + + // output field name for aggregations + std::string name; }; -} // namespace internal } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/aggregate.cc b/cpp/src/arrow/compute/exec/aggregate.cc index 934cdd4d1fdd9..41b5bb75b669a 100644 --- a/cpp/src/arrow/compute/exec/aggregate.cc +++ b/cpp/src/arrow/compute/exec/aggregate.cc @@ -22,6 +22,7 @@ #include "arrow/compute/exec_internal.h" #include "arrow/compute/registry.h" #include "arrow/compute/row/grouper.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/task_group.h" namespace arrow { @@ -55,7 +56,9 @@ Result>> InitKernels( std::vector> states(kernels.size()); for (size_t i = 0; i < aggregates.size(); ++i) { - const FunctionOptions* options = aggregates[i].options.get(); + const FunctionOptions* options = + arrow::internal::checked_cast( + aggregates[i].options.get()); if (options == nullptr) { // use known default options for the named function if possible diff --git a/cpp/src/arrow/compute/exec/aggregate.h b/cpp/src/arrow/compute/exec/aggregate.h index 2c62acf2319f2..753b0a8c47e15 100644 --- a/cpp/src/arrow/compute/exec/aggregate.h +++ b/cpp/src/arrow/compute/exec/aggregate.h @@ -41,16 +41,15 @@ Result GroupBy(const std::vector& arguments, const std::vector> GetKernels( - ExecContext* ctx, const std::vector& aggregates, + ExecContext* ctx, const std::vector& aggregates, const std::vector& in_descrs); Result>> InitKernels( const std::vector& kernels, ExecContext* ctx, - const std::vector& aggregates, - const std::vector& in_descrs); + const std::vector& aggregates, const std::vector& in_descrs); Result ResolveKernels( - const std::vector& aggregates, + const std::vector& aggregates, const std::vector& kernels, const std::vector>& states, ExecContext* ctx, const std::vector& descrs); diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index c5c5d3efcfc54..8c7899c41ec71 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -44,7 +44,7 @@ namespace compute { namespace { void AggregatesToString(std::stringstream* ss, const Schema& input_schema, - const std::vector& aggs, + const std::vector& aggs, const std::vector& target_field_ids, int indent = 0) { *ss << "aggregates=[" << std::endl; for (size_t i = 0; i < aggs.size(); i++) { @@ -64,8 +64,7 @@ class ScalarAggregateNode : public ExecNode { public: ScalarAggregateNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, - std::vector target_field_ids, - std::vector aggs, + std::vector target_field_ids, std::vector aggs, std::vector kernels, std::vector>> states) : ExecNode(plan, std::move(inputs), {"target"}, @@ -89,12 +88,12 @@ class ScalarAggregateNode : public ExecNode { std::vector kernels(aggregates.size()); std::vector>> states(kernels.size()); FieldVector fields(kernels.size()); - const auto& field_names = aggregate_options.names; std::vector target_field_ids(kernels.size()); for (size_t i = 0; i < kernels.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto match, - FieldRef(aggregate_options.targets[i]).FindOne(input_schema)); + ARROW_ASSIGN_OR_RAISE( + auto match, + FieldRef(aggregate_options.aggregates[i].target).FindOne(input_schema)); target_field_ids[i] = match[0]; ARROW_ASSIGN_OR_RAISE( @@ -129,7 +128,7 @@ class ScalarAggregateNode : public ExecNode { ARROW_ASSIGN_OR_RAISE( auto descr, kernels[i]->signature->out_type().Resolve(&kernel_ctx, {in_type})); - fields[i] = field(field_names[i], std::move(descr.type)); + fields[i] = field(aggregate_options.aggregates[i].name, std::move(descr.type)); } return plan->EmplaceNode( @@ -263,7 +262,7 @@ class ScalarAggregateNode : public ExecNode { } const std::vector target_field_ids_; - const std::vector aggs_; + const std::vector aggs_; const std::vector kernels_; std::vector>> states_; @@ -276,7 +275,7 @@ class GroupByNode : public ExecNode { public: GroupByNode(ExecNode* input, std::shared_ptr output_schema, ExecContext* ctx, std::vector key_field_ids, std::vector agg_src_field_ids, - std::vector aggs, + std::vector aggs, std::vector agg_kernels) : ExecNode(input->plan(), {input}, {"groupby"}, std::move(output_schema), /*num_outputs=*/1), @@ -295,7 +294,6 @@ class GroupByNode : public ExecNode { const auto& keys = aggregate_options.keys; // Copy (need to modify options pointer below) auto aggs = aggregate_options.aggregates; - const auto& field_names = aggregate_options.names; // Get input schema auto input_schema = input->output_schema(); @@ -310,13 +308,11 @@ class GroupByNode : public ExecNode { // Find input field indices for aggregates std::vector agg_src_field_ids(aggs.size()); for (size_t i = 0; i < aggs.size(); ++i) { - ARROW_ASSIGN_OR_RAISE(auto match, - aggregate_options.targets[i].FindOne(*input_schema)); + ARROW_ASSIGN_OR_RAISE(auto match, aggs[i].target.FindOne(*input_schema)); agg_src_field_ids[i] = match[0]; } // Build vector of aggregate source field data types - DCHECK_EQ(aggregate_options.targets.size(), aggs.size()); std::vector agg_src_descrs(aggs.size()); for (size_t i = 0; i < aggs.size(); ++i) { auto agg_src_field_id = agg_src_field_ids[i]; @@ -342,7 +338,8 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < aggs.size(); ++i) { - output_fields[i] = agg_result_fields[i]->WithName(field_names[i]); + output_fields[i] = + agg_result_fields[i]->WithName(aggregate_options.aggregates[i].name); } size_t base = aggs.size(); for (size_t i = 0; i < keys.size(); ++i) { @@ -660,7 +657,7 @@ class GroupByNode : public ExecNode { const std::vector key_field_ids_; const std::vector agg_src_field_ids_; - const std::vector aggs_; + const std::vector aggs_; const std::vector agg_kernels_; ThreadIndexer get_thread_index_; diff --git a/cpp/src/arrow/compute/exec/ir_consumer.cc b/cpp/src/arrow/compute/exec/ir_consumer.cc index 0aafa2c281984..f17dbf1ed7962 100644 --- a/cpp/src/arrow/compute/exec/ir_consumer.cc +++ b/cpp/src/arrow/compute/exec/ir_consumer.cc @@ -531,7 +531,7 @@ Result Convert(const ir::Relation& rel) { ARROW_ASSIGN_OR_RAISE(auto arg, Convert(*aggregate->rel()).As()); - AggregateNodeOptions opts{{}, {}, {}}; + AggregateNodeOptions opts{{}, {}}; if (!aggregate->measures()) return UnexpectedNullField("Aggregate.measures"); for (const ir::Expression* m : *aggregate->measures()) { @@ -550,9 +550,8 @@ Result Convert(const ir::Relation& rel) { "Support for non-FieldRef arguments to Aggregate.measures"); } - opts.aggregates.push_back({call->function_name, nullptr}); - opts.targets.push_back(*target); - opts.names.push_back(call->function_name + " " + target->ToString()); + opts.aggregates.push_back({call->function_name, nullptr, *target, + call->function_name + " " + target->ToString()}); } if (!aggregate->groupings()) return UnexpectedNullField("Aggregate.groupings"); diff --git a/cpp/src/arrow/compute/exec/ir_test.cc b/cpp/src/arrow/compute/exec/ir_test.cc index 847f555c69ace..d7eb37c185e13 100644 --- a/cpp/src/arrow/compute/exec/ir_test.cc +++ b/cpp/src/arrow/compute/exec/ir_test.cc @@ -249,7 +249,8 @@ TEST(Relation, Filter) { } TEST(Relation, AggregateSimple) { - ASSERT_THAT(ConvertJSON(R"({ + ASSERT_THAT( + ConvertJSON(R"({ "impl": { id: {id: 1}, "groupings": [ @@ -347,28 +348,22 @@ TEST(Relation, AggregateSimple) { }, "impl_type": "Aggregate" })"), - ResultWith(Eq(Declaration::Sequence({ - {"catalog_source", - CatalogSourceNodeOptions{"tbl", schema({ - field("foo", int32()), - field("bar", int64()), - field("baz", float64()), - })}, - "0"}, - {"aggregate", - AggregateNodeOptions{/*aggregates=*/{ - {"sum", nullptr}, - {"mean", nullptr}, - }, - /*targets=*/{1, 2}, - /*names=*/ - { - "sum FieldRef.FieldPath(1)", - "mean FieldRef.FieldPath(2)", - }, - /*keys=*/{0}}, - "1"}, - })))); + ResultWith(Eq(Declaration::Sequence({ + {"catalog_source", + CatalogSourceNodeOptions{"tbl", schema({ + field("foo", int32()), + field("bar", int64()), + field("baz", float64()), + })}, + "0"}, + {"aggregate", + AggregateNodeOptions{/*aggregates=*/{ + {"sum", nullptr, 1, "sum FieldRef.FieldPath(1)"}, + {"mean", nullptr, 2, "mean FieldRef.FieldPath(2)"}, + }, + /*keys=*/{0}}, + "1"}, + })))); } TEST(Relation, AggregateWithHaving) { @@ -564,14 +559,8 @@ TEST(Relation, AggregateWithHaving) { {"filter", FilterNodeOptions{less(field_ref(0), literal(3))}, "1"}, {"aggregate", AggregateNodeOptions{/*aggregates=*/{ - {"sum", nullptr}, - {"mean", nullptr}, - }, - /*targets=*/{1, 2}, - /*names=*/ - { - "sum FieldRef.FieldPath(1)", - "mean FieldRef.FieldPath(2)", + {"sum", nullptr, 1, "sum FieldRef.FieldPath(1)"}, + {"mean", nullptr, 2, "mean FieldRef.FieldPath(2)"}, }, /*keys=*/{0}}, "2"}, diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index e0fb31963c5f1..a86b6c63d3662 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -111,20 +111,12 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { /// \brief Make a node which aggregates input batches, optionally grouped by keys. class ARROW_EXPORT AggregateNodeOptions : public ExecNodeOptions { public: - AggregateNodeOptions(std::vector aggregates, - std::vector targets, std::vector names, - std::vector keys = {}) - : aggregates(std::move(aggregates)), - targets(std::move(targets)), - names(std::move(names)), - keys(std::move(keys)) {} + explicit AggregateNodeOptions(std::vector aggregates, + std::vector keys = {}) + : aggregates(std::move(aggregates)), keys(std::move(keys)) {} // aggregations which will be applied to the targetted fields - std::vector aggregates; - // fields to which aggregations will be applied - std::vector targets; - // output field names for aggregations - std::vector names; + std::vector aggregates; // keys by which aggregations will be grouped std::vector keys; }; diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 2df3c5e915e3d..9efa6623e5a21 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -391,9 +391,10 @@ TEST(ExecPlan, ToString) { }}}, {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"hash_sum", nullptr}, {"hash_count", options}}, - /*targets=*/{"multiply(i32, 2)", "multiply(i32, 2)"}, - /*names=*/{"sum(multiply(i32, 2))", "count(multiply(i32, 2))"}, + /*aggregates=*/{ + {"hash_sum", nullptr, "multiply(i32, 2)", "sum(multiply(i32, 2))"}, + {"hash_count", options, "multiply(i32, 2)", + "count(multiply(i32, 2))"}}, /*keys=*/{"bool"}}}, {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"), literal(10))}}, @@ -429,17 +430,16 @@ custom_sink_label:OrderBySinkNode{by={sort_keys=[FieldRef.Name(sum(multiply(i32, rhs.label = "rhs"; union_node.inputs.emplace_back(lhs); union_node.inputs.emplace_back(rhs); - ASSERT_OK(Declaration::Sequence( - { - union_node, - {"aggregate", - AggregateNodeOptions{/*aggregates=*/{{"count", std::move(options)}}, - /*targets=*/{"i32"}, - /*names=*/{"count(i32)"}, - /*keys=*/{}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + Declaration::Sequence( + { + union_node, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"count", options, "i32", "count(i32)"}}, + /*keys=*/{}}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 5 nodes: :SinkNode{} :ScalarAggregateNode{aggregates=[ @@ -765,17 +765,17 @@ TEST(ExecPlanExecution, StressSourceGroupedSumStop) { auto random_data = MakeRandomBatches(input_schema, num_batches); SortOptions options({SortKey("a", SortOrder::Ascending)}); - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{random_data.schema, - random_data.gen(parallel, slow)}}, - {"aggregate", - AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, - /*targets=*/{"a"}, /*names=*/{"sum(a)"}, - /*keys=*/{"b"}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + Declaration::Sequence( + { + {"source", SourceNodeOptions{random_data.schema, + random_data.gen(parallel, slow)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"hash_sum", nullptr, "a", "sum(a)"}}, + /*keys=*/{"b"}}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_OK(plan->Validate()); ASSERT_OK(plan->StartProducing()); @@ -913,17 +913,17 @@ TEST(ExecPlanExecution, SourceGroupedSum) { ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); AsyncGenerator> sink_gen; - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{input.schema, - input.gen(parallel, /*slow=*/false)}}, - {"aggregate", - AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, - /*targets=*/{"i32"}, /*names=*/{"sum(i32)"}, - /*keys=*/{"str"}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + Declaration::Sequence( + { + {"source", + SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr, + "i32", "sum(i32)"}}, + /*keys=*/{"str"}}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), Finishes(ResultWith(UnorderedElementsAreArray({ExecBatchFromJSON( @@ -987,9 +987,8 @@ TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) { field_ref(FieldRef("struct", "bool")), }, {"i32", "bool"}}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, - /*targets=*/{"i32"}, - /*names=*/{"sum(i32)"}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr, + "i32", "sum(i32)"}}, /*keys=*/{"bool"}}}, {"sink", SinkNodeOptions{&sink_gen}}, }) @@ -1021,10 +1020,11 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) { field_ref("str"), call("multiply", {field_ref("i32"), literal(2)}), }}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, - /*targets=*/{"multiply(i32, 2)"}, - /*names=*/{"sum(multiply(i32, 2))"}, - /*keys=*/{"str"}}}, + {"aggregate", + AggregateNodeOptions{ + /*aggregates=*/{{"hash_sum", nullptr, "multiply(i32, 2)", + "sum(multiply(i32, 2))"}}, + /*keys=*/{"str"}}}, {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"), literal(10 * batch_multiplicity))}}, {"sink", SinkNodeOptions{&sink_gen}}, @@ -1060,10 +1060,11 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumOrderBy) { field_ref("str"), call("multiply", {field_ref("i32"), literal(2)}), }}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, - /*targets=*/{"multiply(i32, 2)"}, - /*names=*/{"sum(multiply(i32, 2))"}, - /*keys=*/{"str"}}}, + {"aggregate", + AggregateNodeOptions{ + /*aggregates=*/{{"hash_sum", nullptr, "multiply(i32, 2)", + "sum(multiply(i32, 2))"}}, + /*keys=*/{"str"}}}, {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"), literal(10 * batch_multiplicity))}}, {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}}, @@ -1088,22 +1089,22 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) { AsyncGenerator> sink_gen; SelectKOptions options = SelectKOptions::TopKDefault(/*k=*/1, {"str"}); - ASSERT_OK( - Declaration::Sequence( - { - {"source", - SourceNodeOptions{input.schema, input.gen(parallel, /*slow=*/false)}}, - {"project", ProjectNodeOptions{{ - field_ref("str"), - call("multiply", {field_ref("i32"), literal(2)}), - }}}, - {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, - /*targets=*/{"multiply(i32, 2)"}, - /*names=*/{"sum(multiply(i32, 2))"}, - /*keys=*/{"str"}}}, - {"select_k_sink", SelectKSinkNodeOptions{options, &sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK(Declaration::Sequence( + { + {"source", SourceNodeOptions{input.schema, + input.gen(parallel, /*slow=*/false)}}, + {"project", ProjectNodeOptions{{ + field_ref("str"), + call("multiply", {field_ref("i32"), literal(2)}), + }}}, + {"aggregate", + AggregateNodeOptions{ + /*aggregates=*/{{"hash_sum", nullptr, "multiply(i32, 2)", + "sum(multiply(i32, 2))"}}, + /*keys=*/{"str"}}}, + {"select_k_sink", SelectKSinkNodeOptions{options, &sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_THAT( StartAndCollect(plan.get(), sink_gen), @@ -1118,18 +1119,19 @@ TEST(ExecPlanExecution, SourceScalarAggSink) { auto basic_data = MakeBasicBatches(); - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{basic_data.schema, - basic_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"sum", nullptr}, {"any", nullptr}}, - /*targets=*/{"i32", "bool"}, - /*names=*/{"sum(i32)", "any(bool)"}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + Declaration::Sequence( + { + {"source", + SourceNodeOptions{basic_data.schema, basic_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{ + /*aggregates=*/{{"sum", nullptr, "i32", "sum(i32)"}, + {"any", nullptr, "bool", "any(bool)"}}, + }}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_THAT( StartAndCollect(plan.get(), sink_gen), @@ -1156,9 +1158,9 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) { basic_data.gen(/*parallel=*/false, /*slow=*/false)}}, {"aggregate", - AggregateNodeOptions{/*aggregates=*/{{"tdigest", options}}, - /*targets=*/{"i32"}, - /*names=*/{"tdigest(i32)"}}}, + AggregateNodeOptions{ + /*aggregates=*/{{"tdigest", options, "i32", "tdigest(i32)"}}, + }}, {"sink", SinkNodeOptions{&sink_gen}}, }) .AddToPlan(plan.get())); @@ -1183,10 +1185,9 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) { {"source", SourceNodeOptions{data.schema, data.gen(/*parallel=*/false, /*slow=*/false)}}, {"aggregate", - AggregateNodeOptions{/*aggregates=*/{{"hash_count", options}}, - /*targets=*/{"i32"}, - /*names=*/{"count(i32)"}, - /*keys=*/{"str"}}}, + AggregateNodeOptions{ + /*aggregates=*/{{"hash_count", options, "i32", "count(i32)"}}, + /*keys=*/{"str"}}}, {"sink", SinkNodeOptions{&sink_gen}}, }) .AddToPlan(plan.get())); @@ -1215,29 +1216,24 @@ TEST(ExecPlanExecution, ScalarSourceScalarAggSink) { // index can't be tested as it's order-dependent // mode/quantile can't be tested as they're technically vector kernels - ASSERT_OK( - Declaration::Sequence( - { - {"source", - SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", AggregateNodeOptions{ - /*aggregates=*/{{"all", nullptr}, - {"any", nullptr}, - {"count", nullptr}, - {"mean", nullptr}, - {"product", nullptr}, - {"stddev", nullptr}, - {"sum", nullptr}, - {"tdigest", nullptr}, - {"variance", nullptr}}, - /*targets=*/{"b", "b", "a", "a", "a", "a", "a", "a", "a"}, - /*names=*/ - {"all(b)", "any(b)", "count(a)", "mean(a)", "product(a)", - "stddev(a)", "sum(a)", "tdigest(a)", "variance(a)"}}}, - {"sink", SinkNodeOptions{&sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK(Declaration::Sequence( + { + {"source", SourceNodeOptions{scalar_data.schema, + scalar_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{ + {"all", nullptr, "b", "all(b)"}, + {"any", nullptr, "b", "any(b)"}, + {"count", nullptr, "a", "count(a)"}, + {"mean", nullptr, "a", "mean(a)"}, + {"product", nullptr, "a", "product(a)"}, + {"stddev", nullptr, "a", "stddev(a)"}, + {"sum", nullptr, "a", "sum(a)"}, + {"tdigest", nullptr, "a", "tdigest(a)"}, + {"variance", nullptr, "a", "variance(a)"}}}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_THAT( StartAndCollect(plan.get(), sink_gen), @@ -1267,18 +1263,18 @@ TEST(ExecPlanExecution, ScalarSourceGroupedSum) { scalar_data.schema = schema({field("a", int32()), field("b", boolean())}); SortOptions options({SortKey("b", SortOrder::Descending)}); - ASSERT_OK(Declaration::Sequence( - { - {"source", SourceNodeOptions{scalar_data.schema, - scalar_data.gen(/*parallel=*/false, - /*slow=*/false)}}, - {"aggregate", - AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr}}, - /*targets=*/{"a"}, /*names=*/{"hash_sum(a)"}, - /*keys=*/{"b"}}}, - {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}}, - }) - .AddToPlan(plan.get())); + ASSERT_OK( + Declaration::Sequence( + { + {"source", + SourceNodeOptions{scalar_data.schema, scalar_data.gen(/*parallel=*/false, + /*slow=*/false)}}, + {"aggregate", AggregateNodeOptions{/*aggregates=*/{{"hash_sum", nullptr, + "a", "hash_sum(a)"}}, + /*keys=*/{"b"}}}, + {"order_by_sink", OrderBySinkNodeOptions{options, &sink_gen}}, + }) + .AddToPlan(plan.get())); ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), Finishes(ResultWith(UnorderedElementsAreArray({ diff --git a/cpp/src/arrow/compute/exec/test_util.cc b/cpp/src/arrow/compute/exec/test_util.cc index 40512f868c3cc..1e09cb742fab7 100644 --- a/cpp/src/arrow/compute/exec/test_util.cc +++ b/cpp/src/arrow/compute/exec/test_util.cc @@ -337,10 +337,12 @@ bool operator==(const Declaration& l, const Declaration& r) { if (l_agg->options == nullptr || r_agg->options == nullptr) return false; if (!l_agg->options->Equals(*r_agg->options)) return false; + + if (l_agg->target != r_agg->target) return false; + if (l_agg->name != r_agg->name) return false; } - return l_opts->targets == r_opts->targets && l_opts->names == r_opts->names && - l_opts->keys == r_opts->keys; + return l_opts->keys == r_opts->keys; } if (l.factory_name == "order_by_sink") { @@ -400,24 +402,14 @@ static inline void PrintToImpl(const std::string& factory_name, *os << "aggregates={"; for (const auto& agg : o->aggregates) { - *os << agg.function << "<"; + *os << "function=" << agg.function << "<"; if (agg.options) PrintTo(*agg.options, os); *os << ">,"; + *os << "target=" << agg.target.ToString() << ","; + *os << "name=" << agg.name; } *os << "},"; - *os << "targets={"; - for (const auto& target : o->targets) { - *os << target.ToString() << ","; - } - *os << "},"; - - *os << "names={"; - for (const auto& name : o->names) { - *os << name << ","; - } - *os << "}"; - if (!o->keys.empty()) { *os << ",keys={"; for (const auto& key : o->keys) { diff --git a/cpp/src/arrow/compute/exec/tpch_benchmark.cc b/cpp/src/arrow/compute/exec/tpch_benchmark.cc index 82584f58e936c..54ac7cbdbf514 100644 --- a/cpp/src/arrow/compute/exec/tpch_benchmark.cc +++ b/cpp/src/arrow/compute/exec/tpch_benchmark.cc @@ -77,21 +77,18 @@ std::shared_ptr Plan_Q1(AsyncGenerator>* sin auto sum_opts = std::make_shared(ScalarAggregateOptions::Defaults()); auto count_opts = std::make_shared(CountOptions::CountMode::ALL); - std::vector aggs = { - {"hash_sum", sum_opts}, {"hash_sum", sum_opts}, {"hash_sum", sum_opts}, - {"hash_sum", sum_opts}, {"hash_mean", sum_opts}, {"hash_mean", sum_opts}, - {"hash_mean", sum_opts}, {"hash_count", count_opts}}; - - std::vector to_aggregate = {"sum_qty", "sum_base_price", "sum_disc_price", - "sum_charge", "avg_qty", "avg_price", - "avg_disc", "sum_qty"}; - - std::vector names = {"sum_qty", "sum_base_price", "sum_disc_price", - "sum_charge", "avg_qty", "avg_price", - "avg_disc", "count_order"}; + std::vector aggs = { + {"hash_sum", sum_opts, "sum_qty", "sum_qty"}, + {"hash_sum", sum_opts, "sum_base_price", "sum_base_price"}, + {"hash_sum", sum_opts, "sum_disc_price", "sum_disc_price"}, + {"hash_sum", sum_opts, "sum_charge", "sum_charge"}, + {"hash_mean", sum_opts, "avg_qty", "avg_qty"}, + {"hash_mean", sum_opts, "avg_price", "avg_price"}, + {"hash_mean", sum_opts, "avg_disc", "avg_disc"}, + {"hash_count", count_opts, "sum_qty", "count_order"}}; std::vector keys = {"l_returnflag", "l_linestatus"}; - AggregateNodeOptions agg_opts(aggs, to_aggregate, names, keys); + AggregateNodeOptions agg_opts(aggs, keys); SortKey l_returnflag_key("l_returnflag"); SortKey l_linestatus_key("l_linestatus"); diff --git a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc index c2712854345d8..a8cae5b50ca12 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc @@ -306,8 +306,9 @@ BENCHMARK_TEMPLATE(ReferenceSum, SumBitmapVectorizeUnroll) // GroupBy // -static void BenchmarkGroupBy(benchmark::State& state, - std::vector aggregates, +using arrow::compute::internal::GroupBy; + +static void BenchmarkGroupBy(benchmark::State& state, std::vector aggregates, std::vector arguments, std::vector keys) { for (auto _ : state) { ABORT_NOT_OK(GroupBy(arguments, keys, aggregates).status()); diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 7b47845f234ad..82d40aba94864 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -72,7 +72,7 @@ namespace compute { namespace { Result NaiveGroupBy(std::vector arguments, std::vector keys, - const std::vector& aggregates) { + const std::vector& aggregates) { ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys))); ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_batch.GetDescriptors())); @@ -123,16 +123,9 @@ Result NaiveGroupBy(std::vector arguments, std::vector keys Result GroupByUsingExecPlan(const BatchesWithSchema& input, const std::vector& key_names, - const std::vector& arg_names, - const std::vector& aggregates, + const std::vector& aggregates, bool use_threads, ExecContext* ctx) { std::vector keys(key_names.size()); - std::vector targets(aggregates.size()); - std::vector names(aggregates.size()); - for (size_t i = 0; i < aggregates.size(); ++i) { - names[i] = aggregates[i].function; - targets[i] = FieldRef(arg_names[i]); - } for (size_t i = 0; i < key_names.size(); ++i) { keys[i] = FieldRef(key_names[i]); } @@ -144,9 +137,7 @@ Result GroupByUsingExecPlan(const BatchesWithSchema& input, { {"source", SourceNodeOptions{input.schema, input.gen(use_threads, /*slow=*/false)}}, - {"aggregate", - AggregateNodeOptions{std::move(aggregates), std::move(targets), - std::move(names), std::move(keys)}}, + {"aggregate", AggregateNodeOptions{std::move(aggregates), std::move(keys)}}, {"sink", SinkNodeOptions{&sink_gen}}, }) .AddToPlan(plan.get())); @@ -191,17 +182,15 @@ Result GroupByUsingExecPlan(const BatchesWithSchema& input, /// Simpler overload where you can give the columns as datums Result GroupByUsingExecPlan(const std::vector& arguments, const std::vector& keys, - const std::vector& aggregates, + const std::vector& aggregates, bool use_threads, ExecContext* ctx) { using arrow::compute::detail::ExecBatchIterator; FieldVector scan_fields(arguments.size() + keys.size()); std::vector key_names(keys.size()); - std::vector arg_names(arguments.size()); for (size_t i = 0; i < arguments.size(); ++i) { auto name = std::string("agg_") + std::to_string(i); scan_fields[i] = field(name, arguments[i].type()); - arg_names[i] = std::move(name); } for (size_t i = 0; i < keys.size(); ++i) { auto name = std::string("key_") + std::to_string(i); @@ -223,14 +212,14 @@ Result GroupByUsingExecPlan(const std::vector& arguments, input.batches.push_back(std::move(batch)); } - return GroupByUsingExecPlan(input, key_names, arg_names, aggregates, use_threads, ctx); + return GroupByUsingExecPlan(input, key_names, aggregates, use_threads, ctx); } -void ValidateGroupBy(const std::vector& aggregates, +void ValidateGroupBy(const std::vector& aggregates, std::vector arguments, std::vector keys) { ASSERT_OK_AND_ASSIGN(Datum expected, NaiveGroupBy(arguments, keys, aggregates)); - ASSERT_OK_AND_ASSIGN(Datum actual, GroupBy(arguments, keys, aggregates)); + ASSERT_OK_AND_ASSIGN(Datum actual, internal::GroupBy(arguments, keys, aggregates)); ASSERT_OK(expected.make_array()->ValidateFull()); ValidateOutput(actual); @@ -246,15 +235,27 @@ ExecContext* small_chunksize_context(bool use_threads = false) { return use_threads ? &ctx_with_threads : &ctx; } -Result GroupByTest( - const std::vector& arguments, const std::vector& keys, - const std::vector<::arrow::compute::internal::Aggregate>& aggregates, - bool use_threads, bool use_exec_plan) { +struct TestAggregate { + std::string function; + std::shared_ptr options; +}; + +Result GroupByTest(const std::vector& arguments, + const std::vector& keys, + const std::vector& aggregates, bool use_threads, + bool use_exec_plan = false) { + std::vector internal_aggregates; + int idx = 0; + for (auto t_agg : aggregates) { + internal_aggregates.push_back( + {t_agg.function, t_agg.options, "agg_" + std::to_string(idx), t_agg.function}); + idx = idx + 1; + } if (use_exec_plan) { - return GroupByUsingExecPlan(arguments, keys, aggregates, use_threads, + return GroupByUsingExecPlan(arguments, keys, internal_aggregates, use_threads, small_chunksize_context(use_threads)); } else { - return internal::GroupBy(arguments, keys, aggregates, use_threads, + return internal::GroupBy(arguments, keys, internal_aggregates, use_threads, default_exec_context()); } } @@ -860,11 +861,11 @@ TEST(GroupBy, CountScalar) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); ASSERT_OK_AND_ASSIGN( Datum actual, - GroupByUsingExecPlan(input, {"key"}, {"argument", "argument", "argument"}, + GroupByUsingExecPlan(input, {"key"}, { - {"hash_count", skip_nulls}, - {"hash_count", keep_nulls}, - {"hash_count", count_all}, + {"hash_count", skip_nulls, "argument", "hash_count"}, + {"hash_count", keep_nulls, "argument", "hash_count"}, + {"hash_count", count_all, "argument", "hash_count"}, }, use_threads, default_exec_context())); Datum expected = ArrayFromJSON(struct_({ @@ -1031,14 +1032,14 @@ TEST(GroupBy, MeanOnly) { auto min_count = std::make_shared(/*skip_nulls=*/true, /*min_count=*/3); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy({table->GetColumnByName("argument"), - table->GetColumnByName("argument")}, - {table->GetColumnByName("key")}, - { - {"hash_mean", nullptr}, - {"hash_mean", min_count}, - }, - use_threads)); + GroupByTest({table->GetColumnByName("argument"), + table->GetColumnByName("argument")}, + {table->GetColumnByName("key")}, + { + {"hash_mean", nullptr}, + {"hash_mean", min_count}, + }, + use_threads)); SortBy({"key_0"}, &aggregated_and_grouped); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ @@ -1072,11 +1073,11 @@ TEST(GroupBy, SumMeanProductScalar) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); ASSERT_OK_AND_ASSIGN( Datum actual, - GroupByUsingExecPlan(input, {"key"}, {"argument", "argument", "argument"}, + GroupByUsingExecPlan(input, {"key"}, { - {"hash_sum", nullptr}, - {"hash_mean", nullptr}, - {"hash_product", nullptr}, + {"hash_sum", nullptr, "argument", "hash_sum"}, + {"hash_mean", nullptr, "argument", "hash_mean"}, + {"hash_product", nullptr, "argument", "hash_product"}, }, use_threads, default_exec_context())); Datum expected = ArrayFromJSON(struct_({ @@ -1110,7 +1111,7 @@ TEST(GroupBy, VarianceAndStddev) { ])"); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy( + GroupByTest( { batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), @@ -1121,7 +1122,8 @@ TEST(GroupBy, VarianceAndStddev) { { {"hash_variance", nullptr}, {"hash_stddev", nullptr}, - })); + }, + false)); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ field("hash_variance", float64()), @@ -1151,7 +1153,7 @@ TEST(GroupBy, VarianceAndStddev) { [null, 3] ])"); - ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, internal::GroupBy( + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, GroupByTest( { batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), @@ -1162,7 +1164,8 @@ TEST(GroupBy, VarianceAndStddev) { { {"hash_variance", nullptr}, {"hash_stddev", nullptr}, - })); + }, + false)); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ field("hash_variance", float64()), @@ -1181,7 +1184,7 @@ TEST(GroupBy, VarianceAndStddev) { // Test ddof auto variance_options = std::make_shared(/*ddof=*/2); ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, - internal::GroupBy( + GroupByTest( { batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), @@ -1192,7 +1195,8 @@ TEST(GroupBy, VarianceAndStddev) { { {"hash_variance", variance_options}, {"hash_stddev", variance_options}, - })); + }, + false)); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ field("hash_variance", float64()), @@ -1225,7 +1229,7 @@ TEST(GroupBy, VarianceAndStddevDecimal) { ])"); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy( + GroupByTest( { batch->GetColumnByName("argument0"), batch->GetColumnByName("argument0"), @@ -1240,7 +1244,8 @@ TEST(GroupBy, VarianceAndStddevDecimal) { {"hash_stddev", nullptr}, {"hash_variance", nullptr}, {"hash_stddev", nullptr}, - })); + }, + false)); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ field("hash_variance", float64()), @@ -1291,7 +1296,7 @@ TEST(GroupBy, TDigest) { std::make_shared(/*q=*/0.5, /*delta=*/100, /*buffer_size=*/500, /*skip_nulls=*/false, /*min_count=*/3); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy( + GroupByTest( { batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), @@ -1310,7 +1315,8 @@ TEST(GroupBy, TDigest) { {"hash_tdigest", keep_nulls}, {"hash_tdigest", min_count}, {"hash_tdigest", keep_nulls_min_count}, - })); + }, + false)); AssertDatumsApproxEqual( ArrayFromJSON(struct_({ @@ -1349,7 +1355,7 @@ TEST(GroupBy, TDigestDecimal) { ])"); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy( + GroupByTest( { batch->GetColumnByName("argument0"), batch->GetColumnByName("argument1"), @@ -1358,7 +1364,8 @@ TEST(GroupBy, TDigestDecimal) { { {"hash_tdigest", nullptr}, {"hash_tdigest", nullptr}, - })); + }, + false)); AssertDatumsApproxEqual( ArrayFromJSON(struct_({ @@ -1403,7 +1410,7 @@ TEST(GroupBy, ApproximateMedian) { auto keep_nulls_min_count = std::make_shared( /*skip_nulls=*/false, /*min_count=*/3); ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy( + GroupByTest( { batch->GetColumnByName("argument"), batch->GetColumnByName("argument"), @@ -1418,7 +1425,8 @@ TEST(GroupBy, ApproximateMedian) { {"hash_approximate_median", keep_nulls}, {"hash_approximate_median", min_count}, {"hash_approximate_median", keep_nulls_min_count}, - })); + }, + false)); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ field("hash_approximate_median", float64()), @@ -1456,19 +1464,18 @@ TEST(GroupBy, StddevVarianceTDigestScalar) { for (bool use_threads : {false}) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); - ASSERT_OK_AND_ASSIGN(Datum actual, - GroupByUsingExecPlan(input, {"key"}, - {"argument", "argument", "argument", - "argument1", "argument1", "argument1"}, - { - {"hash_stddev", nullptr}, - {"hash_variance", nullptr}, - {"hash_tdigest", nullptr}, - {"hash_stddev", nullptr}, - {"hash_variance", nullptr}, - {"hash_tdigest", nullptr}, - }, - use_threads, default_exec_context())); + ASSERT_OK_AND_ASSIGN( + Datum actual, + GroupByUsingExecPlan(input, {"key"}, + { + {"hash_stddev", nullptr, "argument", "hash_stddev"}, + {"hash_variance", nullptr, "argument", "hash_variance"}, + {"hash_tdigest", nullptr, "argument", "hash_tdigest"}, + {"hash_stddev", nullptr, "argument1", "hash_stddev"}, + {"hash_variance", nullptr, "argument1", "hash_variance"}, + {"hash_tdigest", nullptr, "argument1", "hash_tdigest"}, + }, + use_threads, default_exec_context())); Datum expected = ArrayFromJSON(struct_({ field("hash_stddev", float64()), @@ -1516,25 +1523,19 @@ TEST(GroupBy, VarianceOptions) { for (bool use_threads : {false}) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); - ASSERT_OK_AND_ASSIGN(Datum actual, - GroupByUsingExecPlan(input, {"key"}, - { - "argument", - "argument", - "argument", - "argument", - "argument", - "argument", - }, - { - {"hash_stddev", keep_nulls}, - {"hash_stddev", min_count}, - {"hash_stddev", keep_nulls_min_count}, - {"hash_variance", keep_nulls}, - {"hash_variance", min_count}, - {"hash_variance", keep_nulls_min_count}, - }, - use_threads, default_exec_context())); + ASSERT_OK_AND_ASSIGN( + Datum actual, + GroupByUsingExecPlan( + input, {"key"}, + { + {"hash_stddev", keep_nulls, "argument", "hash_stddev"}, + {"hash_stddev", min_count, "argument", "hash_stddev"}, + {"hash_stddev", keep_nulls_min_count, "argument", "hash_stddev"}, + {"hash_variance", keep_nulls, "argument", "hash_variance"}, + {"hash_variance", min_count, "argument", "hash_variance"}, + {"hash_variance", keep_nulls_min_count, "argument", "hash_variance"}, + }, + use_threads, default_exec_context())); Datum expected = ArrayFromJSON(struct_({ field("hash_stddev", float64()), field("hash_stddev", float64()), @@ -1553,25 +1554,19 @@ TEST(GroupBy, VarianceOptions) { ValidateOutput(expected); AssertDatumsApproxEqual(expected, actual, /*verbose=*/true); - ASSERT_OK_AND_ASSIGN(actual, - GroupByUsingExecPlan(input, {"key"}, - { - "argument1", - "argument1", - "argument1", - "argument1", - "argument1", - "argument1", - }, - { - {"hash_stddev", keep_nulls}, - {"hash_stddev", min_count}, - {"hash_stddev", keep_nulls_min_count}, - {"hash_variance", keep_nulls}, - {"hash_variance", min_count}, - {"hash_variance", keep_nulls_min_count}, - }, - use_threads, default_exec_context())); + ASSERT_OK_AND_ASSIGN( + actual, + GroupByUsingExecPlan( + input, {"key"}, + { + {"hash_stddev", keep_nulls, "argument1", "hash_stddev"}, + {"hash_stddev", min_count, "argument1", "hash_stddev"}, + {"hash_stddev", keep_nulls_min_count, "argument1", "hash_stddev"}, + {"hash_variance", keep_nulls, "argument1", "hash_variance"}, + {"hash_variance", min_count, "argument1", "hash_variance"}, + {"hash_variance", keep_nulls_min_count, "argument1", "hash_variance"}, + }, + use_threads, default_exec_context())); expected = ArrayFromJSON(struct_({ field("hash_stddev", float64()), field("hash_stddev", float64()), @@ -1997,9 +1992,9 @@ TEST(GroupBy, MinMaxScalar) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); ASSERT_OK_AND_ASSIGN( Datum actual, - GroupByUsingExecPlan(input, {"key"}, {"argument", "argument", "argument"}, - {{"hash_min_max", nullptr}}, use_threads, - default_exec_context())); + GroupByUsingExecPlan(input, {"key"}, + {{"hash_min_max", nullptr, "argument", "hash_min_max"}}, + use_threads, default_exec_context())); Datum expected = ArrayFromJSON(struct_({ field("hash_min_max", @@ -2062,14 +2057,14 @@ TEST(GroupBy, AnyAndAll) { }, {table->GetColumnByName("key")}, { - {"hash_any", no_min}, - {"hash_any", min_count}, - {"hash_any", keep_nulls}, - {"hash_any", keep_nulls_min_count}, - {"hash_all", no_min}, - {"hash_all", min_count}, - {"hash_all", keep_nulls}, - {"hash_all", keep_nulls_min_count}, + {"hash_any", no_min, "agg_0", "hash_any"}, + {"hash_any", min_count, "agg_1", "hash_any"}, + {"hash_any", keep_nulls, "agg_2", "hash_any"}, + {"hash_any", keep_nulls_min_count, "agg_3", "hash_any"}, + {"hash_all", no_min, "agg_4", "hash_all"}, + {"hash_all", min_count, "agg_5", "hash_all"}, + {"hash_all", keep_nulls, "agg_6", "hash_all"}, + {"hash_all", keep_nulls_min_count, "agg_7", "hash_all"}, }, use_threads)); SortBy({"key_0"}, &aggregated_and_grouped); @@ -2122,12 +2117,11 @@ TEST(GroupBy, AnyAllScalar) { ASSERT_OK_AND_ASSIGN( Datum actual, GroupByUsingExecPlan(input, {"key"}, - {"argument", "argument", "argument", "argument"}, { - {"hash_any", nullptr}, - {"hash_all", nullptr}, - {"hash_any", keep_nulls}, - {"hash_all", keep_nulls}, + {"hash_any", nullptr, "argument", "hash_any"}, + {"hash_all", nullptr, "argument", "hash_all"}, + {"hash_any", keep_nulls, "argument", "hash_any"}, + {"hash_all", keep_nulls, "argument", "hash_all"}, }, use_threads, default_exec_context())); Datum expected = ArrayFromJSON(struct_({ @@ -2184,22 +2178,23 @@ TEST(GroupBy, CountDistinct) { [3, null] ])"}); - ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, - internal::GroupBy( - { - table->GetColumnByName("argument"), - table->GetColumnByName("argument"), - table->GetColumnByName("argument"), - }, - { - table->GetColumnByName("key"), - }, - { - {"hash_count_distinct", all}, - {"hash_count_distinct", only_valid}, - {"hash_count_distinct", only_null}, - }, - use_threads)); + ASSERT_OK_AND_ASSIGN( + Datum aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_count_distinct", all, "agg_0", "hash_count_distinct"}, + {"hash_count_distinct", only_valid, "agg_1", "hash_count_distinct"}, + {"hash_count_distinct", only_null, "agg_2", "hash_count_distinct"}, + }, + use_threads)); SortBy({"key_0"}, &aggregated_and_grouped); ValidateOutput(aggregated_and_grouped); @@ -2250,22 +2245,23 @@ TEST(GroupBy, CountDistinct) { ["b", null] ])"}); - ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, - internal::GroupBy( - { - table->GetColumnByName("argument"), - table->GetColumnByName("argument"), - table->GetColumnByName("argument"), - }, - { - table->GetColumnByName("key"), - }, - { - {"hash_count_distinct", all}, - {"hash_count_distinct", only_valid}, - {"hash_count_distinct", only_null}, - }, - use_threads)); + ASSERT_OK_AND_ASSIGN( + aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_count_distinct", all, "agg_0", "hash_count_distinct"}, + {"hash_count_distinct", only_valid, "agg_1", "hash_count_distinct"}, + {"hash_count_distinct", only_null, "agg_2", "hash_count_distinct"}, + }, + use_threads)); ValidateOutput(aggregated_and_grouped); SortBy({"key_0"}, &aggregated_and_grouped); @@ -2296,22 +2292,23 @@ TEST(GroupBy, CountDistinct) { ])", }); - ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, - internal::GroupBy( - { - table->GetColumnByName("argument"), - table->GetColumnByName("argument"), - table->GetColumnByName("argument"), - }, - { - table->GetColumnByName("key"), - }, - { - {"hash_count_distinct", all}, - {"hash_count_distinct", only_valid}, - {"hash_count_distinct", only_null}, - }, - use_threads)); + ASSERT_OK_AND_ASSIGN( + aggregated_and_grouped, + internal::GroupBy( + { + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + }, + { + table->GetColumnByName("key"), + }, + { + {"hash_count_distinct", all, "agg_0", "hash_count_distinct"}, + {"hash_count_distinct", only_valid, "agg_1", "hash_count_distinct"}, + {"hash_count_distinct", only_null, "agg_2", "hash_count_distinct"}, + }, + use_threads)); ValidateOutput(aggregated_and_grouped); SortBy({"key_0"}, &aggregated_and_grouped); @@ -2379,9 +2376,9 @@ TEST(GroupBy, Distinct) { table->GetColumnByName("key"), }, { - {"hash_distinct", all}, - {"hash_distinct", only_valid}, - {"hash_distinct", only_null}, + {"hash_distinct", all, "agg_0", "hash_distinct"}, + {"hash_distinct", only_valid, "agg_1", "hash_distinct"}, + {"hash_distinct", only_null, "agg_2", "hash_distinct"}, }, use_threads)); ValidateOutput(aggregated_and_grouped); @@ -2452,9 +2449,9 @@ TEST(GroupBy, Distinct) { table->GetColumnByName("key"), }, { - {"hash_distinct", all}, - {"hash_distinct", only_valid}, - {"hash_distinct", only_null}, + {"hash_distinct", all, "agg_0", "hash_distinct"}, + {"hash_distinct", only_valid, "agg_1", "hash_distinct"}, + {"hash_distinct", only_null, "agg_2", "hash_distinct"}, }, use_threads)); ValidateOutput(aggregated_and_grouped); @@ -2744,8 +2741,8 @@ TEST(GroupBy, OneScalar) { SCOPED_TRACE(use_threads ? "parallel/merged" : "serial"); ASSERT_OK_AND_ASSIGN( Datum actual, GroupByUsingExecPlan( - input, {"key"}, {"argument", "argument", "argument"}, - {{"hash_one", nullptr}}, use_threads, default_exec_context())); + input, {"key"}, {{"hash_one", nullptr, "argument", "hash_one"}}, + use_threads, default_exec_context())); const auto& struct_arr = actual.array_as(); // Check the key column @@ -2805,7 +2802,7 @@ TEST(GroupBy, ListNumeric) { table->GetColumnByName("key"), }, { - {"hash_list", nullptr}, + {"hash_list", nullptr, "agg_0", "hash_list"}, }, use_threads)); ValidateOutput(aggregated_and_grouped); @@ -2876,7 +2873,7 @@ TEST(GroupBy, ListNumeric) { table->GetColumnByName("key"), }, { - {"hash_list", nullptr}, + {"hash_list", nullptr, "agg_0", "hash_list"}, }, use_threads)); ValidateOutput(aggregated_and_grouped); @@ -2945,7 +2942,7 @@ TEST(GroupBy, ListBinaryTypes) { table->GetColumnByName("key"), }, { - {"hash_list", nullptr}, + {"hash_list", nullptr, "agg_0", "hash_list"}, }, use_threads)); ValidateOutput(aggregated_and_grouped); @@ -3007,7 +3004,7 @@ TEST(GroupBy, ListBinaryTypes) { table->GetColumnByName("key"), }, { - {"hash_list", nullptr}, + {"hash_list", nullptr, "agg_0", "hash_list"}, }, use_threads)); ValidateOutput(aggregated_and_grouped); @@ -3238,12 +3235,12 @@ TEST(GroupBy, CountAndSum) { batch->GetColumnByName("key"), }, { - {"hash_count", count_options}, - {"hash_count", count_nulls}, - {"hash_count", count_all}, - {"hash_sum", nullptr}, - {"hash_sum", min_count}, - {"hash_sum", nullptr}, + {"hash_count", count_options, "agg_0", "hash_count"}, + {"hash_count", count_nulls, "agg_1", "hash_count"}, + {"hash_count", count_all, "agg_2", "hash_count"}, + {"hash_sum", nullptr, "agg_3", "hash_sum"}, + {"hash_sum", min_count, "agg_4", "hash_sum"}, + {"hash_sum", nullptr, "agg_5", "hash_sum"}, })); AssertDatumsEqual( @@ -3295,9 +3292,9 @@ TEST(GroupBy, Product) { batch->GetColumnByName("key"), }, { - {"hash_product", nullptr}, - {"hash_product", nullptr}, - {"hash_product", min_count}, + {"hash_product", nullptr, "agg_0", "hash_product"}, + {"hash_product", nullptr, "agg_1", "hash_product"}, + {"hash_product", min_count, "agg_2", "hash_product"}, })); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ @@ -3322,16 +3319,17 @@ TEST(GroupBy, Product) { [8589934593, 1] ])"); - ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, internal::GroupBy( - { - batch->GetColumnByName("argument"), - }, - { - batch->GetColumnByName("key"), - }, - { - {"hash_product", nullptr}, - })); + ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, + internal::GroupBy( + { + batch->GetColumnByName("argument"), + }, + { + batch->GetColumnByName("key"), + }, + { + {"hash_product", nullptr, "agg_0", "hash_product"}, + })); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ field("hash_product", int64()), @@ -3374,12 +3372,12 @@ TEST(GroupBy, SumMeanProductKeepNulls) { batch->GetColumnByName("key"), }, { - {"hash_sum", keep_nulls}, - {"hash_sum", min_count}, - {"hash_mean", keep_nulls}, - {"hash_mean", min_count}, - {"hash_product", keep_nulls}, - {"hash_product", min_count}, + {"hash_sum", keep_nulls, "agg_0", "hash_sum"}, + {"hash_sum", min_count, "agg_1", "hash_sum"}, + {"hash_mean", keep_nulls, "agg_2", "hash_mean"}, + {"hash_mean", min_count, "agg_3", "hash_mean"}, + {"hash_product", keep_nulls, "agg_4", "hash_product"}, + {"hash_product", min_count, "agg_5", "hash_product"}, })); AssertDatumsApproxEqual(ArrayFromJSON(struct_({ @@ -3423,7 +3421,7 @@ TEST(GroupBy, SumOnlyStringAndDictKeys) { internal::GroupBy({batch->GetColumnByName("argument")}, {batch->GetColumnByName("key")}, { - {"hash_sum", nullptr}, + {"hash_sum", nullptr, "agg_0", "hash_sum"}, })); SortBy({"key_0"}, &aggregated_and_grouped); @@ -3464,13 +3462,12 @@ TEST(GroupBy, ConcreteCaseWithValidateGroupBy) { std::shared_ptr non_null = std::make_shared(CountOptions::ONLY_VALID); - using internal::Aggregate; for (auto agg : { - Aggregate{"hash_sum", nullptr}, - Aggregate{"hash_count", non_null}, - Aggregate{"hash_count", nulls}, - Aggregate{"hash_min_max", nullptr}, - Aggregate{"hash_min_max", keepna}, + Aggregate{"hash_sum", nullptr, "agg_0", "hash_sum"}, + Aggregate{"hash_count", non_null, "agg_1", "hash_count"}, + Aggregate{"hash_count", nulls, "agg_2", "hash_count"}, + Aggregate{"hash_min_max", nullptr, "agg_3", "hash_min_max"}, + Aggregate{"hash_min_max", keepna, "agg_4", "hash_min_max"}, }) { SCOPED_TRACE(agg.function); ValidateGroupBy({agg}, {batch->GetColumnByName("argument")}, @@ -3492,10 +3489,9 @@ TEST(GroupBy, CountNull) { std::shared_ptr skipna = std::make_shared(CountOptions::ONLY_VALID); - using internal::Aggregate; for (auto agg : { - Aggregate{"hash_count", keepna}, - Aggregate{"hash_count", skipna}, + Aggregate{"hash_count", keepna, "agg_0", "hash_count"}, + Aggregate{"hash_count", skipna, "agg_1", "hash_count"}, }) { SCOPED_TRACE(agg.function); ValidateGroupBy({agg}, {batch->GetColumnByName("argument")}, @@ -3519,7 +3515,7 @@ TEST(GroupBy, RandomArraySum) { ValidateGroupBy( { - {"hash_sum", options}, + {"hash_sum", options, "agg_0", "hash_sum"}, }, {batch->GetColumnByName("argument")}, {batch->GetColumnByName("key")}); } @@ -3552,9 +3548,9 @@ TEST(GroupBy, WithChunkedArray) { table->GetColumnByName("key"), }, { - {"hash_count", nullptr}, - {"hash_sum", nullptr}, - {"hash_min_max", nullptr}, + {"hash_count", nullptr, "agg_0", "hash_count"}, + {"hash_sum", nullptr, "agg_1", "hash_sum"}, + {"hash_min_max", nullptr, "agg_2", "hash_min_max"}, })); AssertDatumsEqual(ArrayFromJSON(struct_({ @@ -3590,7 +3586,7 @@ TEST(GroupBy, MinMaxWithNewGroupsInChunkedArray) { table->GetColumnByName("key"), }, { - {"hash_min_max", nullptr}, + {"hash_min_max", nullptr, "agg_1", "hash_min_max"}, })); AssertDatumsEqual(ArrayFromJSON(struct_({ @@ -3626,7 +3622,7 @@ TEST(GroupBy, SmallChunkSizeSumOnly) { internal::GroupBy({batch->GetColumnByName("argument")}, {batch->GetColumnByName("key")}, { - {"hash_sum", nullptr}, + {"hash_sum", nullptr, "agg_0", "hash_sum"}, }, small_chunksize_context())); AssertDatumsEqual(ArrayFromJSON(struct_({ diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 02f658181c06f..3cd5f1fcc26fb 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -675,10 +675,8 @@ Result AsyncScanner::CountRows() { std::move(fragment_gen)), options}}, {"project", compute::ProjectNodeOptions{{options->filter}, {"mask"}}}, - {"aggregate", compute::AggregateNodeOptions{{compute::internal::Aggregate{ - "sum", nullptr}}, - /*targets=*/{"mask"}, - /*names=*/{"selected_count"}}}, + {"aggregate", compute::AggregateNodeOptions{{compute::Aggregate{ + "sum", nullptr, "mask", "selected_count"}}}}, {"sink", compute::SinkNodeOptions{&sink_gen}}, }) .AddToPlan(plan.get())); diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index b7dcb8b18d2b2..5316f63d08015 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -1837,11 +1837,9 @@ TEST(ScanNode, MinimalScalarAggEndToEnd) { // pipe the projection into a scalar aggregate node ASSERT_OK_AND_ASSIGN( compute::ExecNode * aggregate, - compute::MakeExecNode( - "aggregate", plan.get(), {project}, - compute::AggregateNodeOptions{{compute::internal::Aggregate{"sum", nullptr}}, - /*targets=*/{"a * 2"}, - /*names=*/{"sum(a * 2)"}})); + compute::MakeExecNode("aggregate", plan.get(), {project}, + compute::AggregateNodeOptions{{compute::Aggregate{ + "sum", nullptr, "a * 2", "sum(a * 2)"}}})); // finally, pipe the aggregate node into a sink node AsyncGenerator> sink_gen; @@ -1927,12 +1925,11 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) { // pipe the projection into a grouped aggregate node ASSERT_OK_AND_ASSIGN( compute::ExecNode * aggregate, - compute::MakeExecNode("aggregate", plan.get(), {project}, - compute::AggregateNodeOptions{ - {compute::internal::Aggregate{"hash_sum", nullptr}}, - /*targets=*/{"a * 2"}, - /*names=*/{"sum(a * 2)"}, - /*keys=*/{"b"}})); + compute::MakeExecNode( + "aggregate", plan.get(), {project}, + compute::AggregateNodeOptions{ + {compute::Aggregate{"hash_sum", nullptr, "a * 2", "sum(a * 2)"}}, + /*keys=*/{"b"}})); // finally, pipe the aggregate node into a sink node AsyncGenerator> sink_gen; diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 302ac99c36a39..9e43eb4eb9c76 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2408,7 +2408,7 @@ cdef extern from * namespace "arrow::compute": cdef extern from "arrow/compute/exec/aggregate.h" namespace \ "arrow::compute::internal" nogil: - cdef cppclass CAggregate "arrow::compute::internal::Aggregate": + cdef cppclass CAggregate "arrow::compute::Aggregate": c_string function shared_ptr[CFunctionOptions] options diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index bf5a8d0682181..84f6ee54fc74a 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -432,8 +432,8 @@ ExecNode_Project <- function(input, exprs, names) { .Call(`_arrow_ExecNode_Project`, input, exprs, names) } -ExecNode_Aggregate <- function(input, options, target_names, out_field_names, key_names) { - .Call(`_arrow_ExecNode_Aggregate`, input, options, target_names, out_field_names, key_names) +ExecNode_Aggregate <- function(input, options, key_names) { + .Call(`_arrow_ExecNode_Aggregate`, input, options, key_names) } ExecNode_Join <- function(input, type, right_data, left_keys, right_keys, left_output, right_output, output_suffix_for_left, output_suffix_for_right) { diff --git a/r/R/query-engine.R b/r/R/query-engine.R index c40c61e98a7cd..513b861d4148d 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - ExecPlan <- R6Class("ExecPlan", inherit = ArrowObject, public = list( @@ -73,8 +72,6 @@ ExecPlan <- R6Class("ExecPlan", group_vars <- dplyr::group_vars(.data) grouped <- length(group_vars) > 0 - # Collect the target names first because we have to add back the group vars - target_names <- names(.data) .data <- ensure_group_vars(.data) .data <- ensure_arrange_vars(.data) # this sets .data$temp_columns @@ -115,10 +112,15 @@ ExecPlan <- R6Class("ExecPlan", }) } + .data$aggregations <- imap(.data$aggregations, function(x, name) { + # Embed the name inside the aggregation objects. `target` and `name` + # are the same because we just Project()ed the data that way above + x[["name"]] <- x[["target"]] <- name + x + }) + node <- node$Aggregate( - options = map(.data$aggregations, ~ .[c("fun", "options")]), - target_names = names(.data$aggregations), - out_field_names = names(.data$aggregations), + options = .data$aggregations, key_names = group_vars ) @@ -179,7 +181,6 @@ ExecPlan <- R6Class("ExecPlan", temp_columns = names(.data$temp_columns) ) } - # This is only safe because we are going to evaluate queries that end # with head/tail first, then evaluate any subsequent query as a new query if (!is.null(.data$head)) { @@ -304,9 +305,9 @@ ExecNode <- R6Class("ExecNode", assert_is(expr, "Expression") self$preserve_extras(ExecNode_Filter(self, expr)) }, - Aggregate = function(options, target_names, out_field_names, key_names) { + Aggregate = function(options, key_names) { out <- self$preserve_extras( - ExecNode_Aggregate(self, options, target_names, out_field_names, key_names) + ExecNode_Aggregate(self, options, key_names) ) # dplyr drops top-level attributes when you call summarize() out$extras$source_schema$metadata[["r"]]$attributes <- NULL diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 947270199ab12..62c8b6695c868 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -964,15 +964,13 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -std::shared_ptr ExecNode_Aggregate(const std::shared_ptr& input, cpp11::list options, std::vector target_names, std::vector out_field_names, std::vector key_names); -extern "C" SEXP _arrow_ExecNode_Aggregate(SEXP input_sexp, SEXP options_sexp, SEXP target_names_sexp, SEXP out_field_names_sexp, SEXP key_names_sexp){ +std::shared_ptr ExecNode_Aggregate(const std::shared_ptr& input, cpp11::list options, std::vector key_names); +extern "C" SEXP _arrow_ExecNode_Aggregate(SEXP input_sexp, SEXP options_sexp, SEXP key_names_sexp){ BEGIN_CPP11 arrow::r::Input&>::type input(input_sexp); arrow::r::Input::type options(options_sexp); - arrow::r::Input>::type target_names(target_names_sexp); - arrow::r::Input>::type out_field_names(out_field_names_sexp); arrow::r::Input>::type key_names(key_names_sexp); - return cpp11::as_sexp(ExecNode_Aggregate(input, options, target_names, out_field_names, key_names)); + return cpp11::as_sexp(ExecNode_Aggregate(input, options, key_names)); END_CPP11 } // compute-exec.cpp @@ -5248,7 +5246,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, { "_arrow_ExecNode_Project", (DL_FUNC) &_arrow_ExecNode_Project, 3}, - { "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 5}, + { "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 3}, { "_arrow_ExecNode_Join", (DL_FUNC) &_arrow_ExecNode_Join, 9}, { "_arrow_ExecNode_Union", (DL_FUNC) &_arrow_ExecNode_Union, 2}, { "_arrow_ExecNode_SourceNode", (DL_FUNC) &_arrow_ExecNode_SourceNode, 2}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 089d1e71eba18..76112b4cefd97 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -228,29 +228,26 @@ std::shared_ptr ExecNode_Project( // [[arrow::export]] std::shared_ptr ExecNode_Aggregate( const std::shared_ptr& input, cpp11::list options, - std::vector target_names, std::vector out_field_names, std::vector key_names) { - std::vector aggregates; + std::vector aggregates; for (cpp11::list name_opts : options) { - auto name = cpp11::as_cpp(name_opts[0]); - auto opts = make_compute_options(name, name_opts[1]); + auto function = cpp11::as_cpp(name_opts["fun"]); + auto opts = make_compute_options(function, name_opts["options"]); + auto target = cpp11::as_cpp(name_opts["target"]); + auto name = cpp11::as_cpp(name_opts["name"]); - aggregates.push_back( - arrow::compute::internal::Aggregate{std::move(name), std::move(opts)}); + aggregates.push_back(arrow::compute::Aggregate{std::move(function), opts, + std::move(target), std::move(name)}); } - std::vector targets, keys; - for (auto&& name : target_names) { - targets.emplace_back(std::move(name)); - } + std::vector keys; for (auto&& name : key_names) { keys.emplace_back(std::move(name)); } return MakeExecNodeOrStop( "aggregate", input->plan(), {input.get()}, - compute::AggregateNodeOptions{std::move(aggregates), std::move(targets), - std::move(out_field_names), std::move(keys)}); + compute::AggregateNodeOptions{std::move(aggregates), std::move(keys)}); } // [[arrow::export]]