diff --git a/cpp/src/arrow/acero/aggregate_node.cc b/cpp/src/arrow/acero/aggregate_node.cc index 0366809b933b3..7adbb8c565cfa 100644 --- a/cpp/src/arrow/acero/aggregate_node.cc +++ b/cpp/src/arrow/acero/aggregate_node.cc @@ -350,7 +350,14 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { kernel_intypes[i] = in_types; ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, function->DispatchExact(kernel_intypes[i])); - kernels[i] = static_cast(kernel); + const ScalarAggregateKernel* agg_kernel = + static_cast(kernel); + if (concurrency > 1 && agg_kernel->ordered) { + return Status::NotImplemented( + "Using ordered aggregator in multiple threaded execution is not supported"); + } + + kernels[i] = agg_kernel; if (aggregates[i].options == nullptr) { DCHECK(!function->doc().options_required); @@ -391,12 +398,13 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { auto aggregates = aggregate_options.aggregates; const auto& keys = aggregate_options.keys; const auto& segment_keys = aggregate_options.segment_keys; + const auto concurreny = + plan->query_context()->exec_context()->executor()->GetCapacity(); if (keys.size() > 0) { return Status::Invalid("Scalar aggregation with some key"); } - if (plan->query_context()->exec_context()->executor()->GetCapacity() > 1 && - segment_keys.size() > 0) { + if (concurreny > 1 && segment_keys.size() > 0) { return Status::NotImplemented("Segmented aggregation in a multi-threaded plan"); } @@ -406,7 +414,16 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { ARROW_ASSIGN_OR_RAISE( auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggregates, exec_ctx, - /*concurrency=*/plan->query_context()->max_concurrency())); + /*concurrency=*/concurreny)); + + if (concurreny > 1) { + for (auto& kernel : args.kernels) { + if (kernel->ordered) { + return Status::NotImplemented( + "Using ordered aggregator in multiple threaded execution is not supported"); + } + } + } return plan->EmplaceNode( plan, std::move(inputs), std::move(args.output_schema), std::move(args.segmenter), @@ -599,7 +616,7 @@ class GroupByNode : public ExecNode, public TracedNode { static Result> MakeAggregateNodeArgs( const std::shared_ptr& input_schema, const std::vector& keys, const std::vector& segment_keys, const std::vector& aggs, - ExecContext* ctx) { + ExecContext* ctx, const int concurrency) { // Find input field indices for key fields std::vector key_field_ids(keys.size()); for (size_t i = 0; i < keys.size(); ++i) { @@ -656,6 +673,20 @@ class GroupByNode : public ExecNode, public TracedNode { // Construct aggregates ARROW_ASSIGN_OR_RAISE(auto agg_kernels, GetKernels(ctx, aggs, agg_src_types)); + if (concurrency > 1) { + if (segment_keys.size() > 0) { + return Status::NotImplemented( + "Segmented aggregation in a multi-threaded execution context"); + } + + for (auto kernel : agg_kernels) { + if (kernel->ordered) { + return Status::NotImplemented( + "Using ordered aggregator in multiple threaded execution is not supported"); + } + } + } + ARROW_ASSIGN_OR_RAISE(auto agg_states, InitKernels(agg_kernels, ctx, aggs, agg_src_types)); @@ -703,18 +734,13 @@ class GroupByNode : public ExecNode, public TracedNode { const auto& keys = aggregate_options.keys; const auto& segment_keys = aggregate_options.segment_keys; auto aggs = aggregate_options.aggregates; - - if (plan->query_context()->exec_context()->executor()->GetCapacity() > 1 && - segment_keys.size() > 0) { - return Status::NotImplemented( - "Segmented aggregation in a multi-threaded execution context"); - } + auto concurrency = plan->query_context()->exec_context()->executor()->GetCapacity(); const auto& input_schema = input->output_schema(); auto exec_ctx = plan->query_context()->exec_context(); - - ARROW_ASSIGN_OR_RAISE(auto args, MakeAggregateNodeArgs(input_schema, keys, - segment_keys, aggs, exec_ctx)); + ARROW_ASSIGN_OR_RAISE( + auto args, MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggs, exec_ctx, + concurrency)); return input->plan()->EmplaceNode( input, std::move(args.output_schema), std::move(args.grouping_key_field_ids), @@ -1042,9 +1068,9 @@ Result> MakeOutputSchema( exec_ctx, /*concurrency=*/1)); return std::move(args.output_schema); } else { - ARROW_ASSIGN_OR_RAISE( - auto args, GroupByNode::MakeAggregateNodeArgs(input_schema, keys, segment_keys, - aggregates, exec_ctx)); + ARROW_ASSIGN_OR_RAISE(auto args, GroupByNode::MakeAggregateNodeArgs( + input_schema, keys, segment_keys, aggregates, + exec_ctx, /*concurrency=*/1)); return std::move(args.output_schema); } } diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc b/cpp/src/arrow/acero/hash_aggregate_test.cc index dfeae61a6cda6..144098e169414 100644 --- a/cpp/src/arrow/acero/hash_aggregate_test.cc +++ b/cpp/src/arrow/acero/hash_aggregate_test.cc @@ -402,7 +402,7 @@ Result RunGroupBy(const BatchesWithSchema& input, const std::vector& segment_key_names, const std::vector& aggregates, bool use_threads, bool segmented = false, bool naive = false) { - if (segment_key_names.size() > 0) { + if (!use_threads) { ARROW_ASSIGN_OR_RAISE(auto thread_pool, arrow::internal::ThreadPool::Make(1)); ExecContext seq_ctx(default_memory_pool(), thread_pool.get()); return RunGroupBy(input, key_names, segment_key_names, aggregates, &seq_ctx, @@ -4206,6 +4206,256 @@ TEST_P(GroupBy, MinMaxWithNewGroupsInChunkedArray) { /*verbose=*/true); } +TEST_P(GroupBy, FirstLastBasicTypes) { + std::vector> types; + types.insert(types.end(), boolean()); + types.insert(types.end(), NumericTypes().begin(), NumericTypes().end()); + types.insert(types.end(), TemporalTypes().begin(), TemporalTypes().end()); + + const std::vector numeric_table = {R"([ + [1, 1], + [null, 5], + [null, 1], + [null, 7] +])", + R"([ + [0, 2], + [null, 3], + [3, 4], + [5, 4], + [4, null], + [3, 1], + [6, 6], + [5, 5], + [0, 2], + [7, 7] +])", + R"([ + [0, 2], + [1, null], + [6, 5], + [null, 5], + [null, 6], + [null, 3] +])"}; + + const std::string numeric_expected = + R"([ + [1, 1, 3, 1, 3], + [2, 0, 0, 0, 0], + [3, null, null, null, null], + [4, 3, 5, 3, 5], + [5, 5, 6, null, null], + [6, 6, 6, 6, null], + [7, 7, 7, null, 7], + [null, 4, 1, 4, 1] + ])"; + + const std::vector date64_table = {R"([ + [86400000, 1], + [null, 1] +])", + R"([ + [0, 2], + [null, 3], + [259200000, 4], + [432000000, 4], + [345600000, null], + [259200000, 1], + [0, 2] +])", + R"([ + [0, 2], + [86400000, null], + [null, 3] +])"}; + + const std::string date64_expected = + R"([ + [1, 86400000,259200000,86400000,259200000], + [2, 0,0,0,0], + [3, null,null,null,null], + [4, 259200000,432000000,259200000,432000000], + [null, 345600000,86400000,345600000,86400000] + ])"; + + const std::vector boolean_table = {R"([ + [true, 1], + [null, 1] +])", + R"([ + [false, 2], + [null, 3], + [false, 4], + [true, 4], + [true, null], + [false, 1], + [false, 2] +])", + R"([ + [false, 2], + [false, null], + [null, 3] +])"}; + + const std::string boolean_expected = + R"([ + [1, true,false,true,false], + [2, false,false,false,false], + [3, null,null,null,null], + [4, false,true,false,true], + [null, true,false,true,false] + ])"; + + auto keep_nulls = std::make_shared(false, 1); + + for (const auto& ty : types) { + SCOPED_TRACE(ty->ToString()); + auto in_schema = schema({field("argument0", ty), field("key", int64())}); + auto table = TableFromJSON(in_schema, (ty->name() == "date64") ? date64_table + : (ty->name() == "bool") ? boolean_table + : numeric_table); + + ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, + GroupByTest( + { + table->GetColumnByName("argument0"), + table->GetColumnByName("argument0"), + table->GetColumnByName("argument0"), + table->GetColumnByName("argument0"), + }, + {table->GetColumnByName("key")}, + { + {"hash_first", nullptr}, + {"hash_last", nullptr}, + {"hash_first", keep_nulls}, + {"hash_last", keep_nulls}, + }, + /*use_threads=*/false)); + ValidateOutput(aggregated_and_grouped); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("key_0", int64()), + field("hash_first", ty), + field("hash_last", ty), + field("hash_first", ty), + field("hash_last", ty), + }), + (ty->name() == "date64") ? date64_expected + : (ty->name() == "bool") ? boolean_expected + : numeric_expected), + aggregated_and_grouped, + /*verbose=*/true); + } +} + +TEST_P(GroupBy, FirstLastBinary) { + // First / last doesn't support multi threaded execution + bool use_threads = false; + for (const auto& ty : BaseBinaryTypes()) { + auto table = TableFromJSON(schema({ + field("argument0", ty), + field("key", int64()), + }), + {R"([ + ["aaaa", 1], + [null, 5], + [null, 1] +])", + R"([ + ["bcd", 2], + [null, 3], + ["2", null], + ["d", 1], + ["ee", 5], + ["bc", 2] +])", + R"([ + ["babcd", 2], + ["123", null], + [null, 5], + [null, 3] +])"}); + + auto keep_nulls = std::make_shared(false, 1); + ASSERT_OK_AND_ASSIGN( + Datum aggregated_and_grouped, + GroupByTest( + {table->GetColumnByName("argument0"), table->GetColumnByName("argument0"), + table->GetColumnByName("argument0"), table->GetColumnByName("argument0")}, + {table->GetColumnByName("key")}, + {{"hash_first", nullptr}, + {"hash_last", nullptr}, + {"hash_first", keep_nulls}, + {"hash_last", keep_nulls}}, + use_threads)); + ValidateOutput(aggregated_and_grouped); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual( + ArrayFromJSON(struct_({field("key_0", int64()), field("hash_first", ty), + field("hash_last", ty), field("hash_first", ty), + field("hash_last", ty)}), + R"([ + [1, "aaaa", "d", "aaaa", "d"], + [2, "bcd", "babcd", "bcd", "babcd"], + [3, null, null, null, null], + [5, "ee", "ee", null, null], + [null, "2", "123", "2", "123"] + ])"), + aggregated_and_grouped, + /*verbose=*/true); + } +} + +TEST_P(GroupBy, FirstLastFixedSizeBinary) { + const auto ty = fixed_size_binary(3); + bool use_threads = false; + + auto table = TableFromJSON(schema({ + field("argument0", ty), + field("key", int64()), + }), + {R"([ + ["aaa", 1], + [null, 1] +])", + R"([ + ["bac", 2], + [null, 3], + ["234", null], + ["ddd", 1], + ["bcd", 2] +])", + R"([ + ["bab", 2], + ["123", null], + [null, 3] +])"}); + + ASSERT_OK_AND_ASSIGN( + Datum aggregated_and_grouped, + GroupByTest( + {table->GetColumnByName("argument0"), table->GetColumnByName("argument0")}, + {table->GetColumnByName("key")}, + {{"hash_first", nullptr}, {"hash_last", nullptr}}, use_threads)); + ValidateOutput(aggregated_and_grouped); + SortBy({"key_0"}, &aggregated_and_grouped); + + AssertDatumsEqual( + ArrayFromJSON(struct_({field("key_0", int64()), field("hash_first", ty), + field("hash_last", ty)}), + R"([ + [1, "aaa", "ddd"], + [2, "bac", "bab"], + [3, null, null], + [null, "234", "123"] + ])"), + aggregated_and_grouped, + /*verbose=*/true); +} + TEST_P(GroupBy, SmallChunkSizeSumOnly) { auto batch = RecordBatchFromJSON( schema({field("argument", float64()), field("key", int64())}), R"([ @@ -4743,6 +4993,9 @@ void TestSegment(GroupByFunction group_by, const std::shared_ptr& table, is_scalar_aggregate ? "count" : "hash_count", is_scalar_aggregate ? "sum" : "hash_sum", is_scalar_aggregate ? "min_max" : "hash_min_max", + is_scalar_aggregate ? "first_last" : "hash_first_last", + is_scalar_aggregate ? "first" : "hash_first", + is_scalar_aggregate ? "last" : "hash_last", }; ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped, group_by( @@ -4750,12 +5003,18 @@ void TestSegment(GroupByFunction group_by, const std::shared_ptr
& table, table->GetColumnByName("argument"), table->GetColumnByName("argument"), table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), + table->GetColumnByName("argument"), }, keys, segment_keys, { {names[0], nullptr, "agg_0", names[0]}, {names[1], nullptr, "agg_1", names[1]}, {names[2], nullptr, "agg_2", names[2]}, + {names[3], nullptr, "agg_3", names[3]}, + {names[4], nullptr, "agg_4", names[4]}, + {names[5], nullptr, "agg_5", names[5]}, }, /*use_threads=*/false, /*naive=*/false)); @@ -4781,26 +5040,29 @@ Result> GetSingleSegmentInputAsChunked() { {R"([{"argument": 1.0, "key": 1, "segment_key": 1}, {"argument": null, "key": 1, "segment_key": 1} ])", - R"([{"argument": 0.0, "key": 2, "segment_key": 1}, - {"argument": null, "key": 3, "segment_key": 1}, - {"argument": 4.0, "key": null, "segment_key": 1}, - {"argument": 3.25, "key": 1, "segment_key": 1}, - {"argument": 0.125, "key": 2, "segment_key": 1}, - {"argument": -0.25, "key": 2, "segment_key": 1}, - {"argument": 0.75, "key": null, "segment_key": 1}, - {"argument": null, "key": 3, "segment_key": 1} + R"([ + {"argument": 0.0, "key": 2, "segment_key": 1}, + {"argument": null, "key": 3, "segment_key": 1}, + {"argument": 4.0, "key": null, "segment_key": 1}, + {"argument": 3.25, "key": 1, "segment_key": 1}, + {"argument": 0.125, "key": 2, "segment_key": 1}, + {"argument": -0.25, "key": 2, "segment_key": 1}, + {"argument": 0.75, "key": null, "segment_key": 1}, + {"argument": null, "key": 3, "segment_key": 1} ])", - R"([{"argument": 1.0, "key": 1, "segment_key": 0}, - {"argument": null, "key": 1, "segment_key": 0} + R"([ + {"argument": 1.0, "key": 1, "segment_key": 0}, + {"argument": null, "key": 1, "segment_key": 0} ])", - R"([{"argument": 0.0, "key": 2, "segment_key": 0}, - {"argument": null, "key": 3, "segment_key": 0}, - {"argument": 4.0, "key": null, "segment_key": 0}, - {"argument": 3.25, "key": 1, "segment_key": 0}, - {"argument": 0.125, "key": 2, "segment_key": 0}, - {"argument": -0.25, "key": 2, "segment_key": 0}, - {"argument": 0.75, "key": null, "segment_key": 0}, - {"argument": null, "key": 3, "segment_key": 0} + R"([ + {"argument": 0.0, "key": 2, "segment_key": 0}, + {"argument": null, "key": 3, "segment_key": 0}, + {"argument": 4.0, "key": null, "segment_key": 0}, + {"argument": 3.25, "key": 1, "segment_key": 0}, + {"argument": 0.125, "key": 2, "segment_key": 0}, + {"argument": -0.25, "key": 2, "segment_key": 0}, + {"argument": 0.75, "key": null, "segment_key": 0}, + {"argument": null, "key": 3, "segment_key": 0} ])"}); return table; } @@ -4811,20 +5073,26 @@ Result> GetSingleSegmentInputAsCombined() { } Result> GetSingleSegmentScalarOutput() { - return ChunkedArrayFromJSON(struct_({ - field("key_0", int64()), - field("count", int64()), - field("sum", float64()), - field("min_max", struct_({ - field("min", float64()), - field("max", float64()), - })), - }), - {R"([ - [1, 7, 8.875, {"min": -0.25, "max": 4.0}] + return ChunkedArrayFromJSON( + struct_({ + field("key_0", int64()), + field("count", int64()), + field("sum", float64()), + field("min_max", struct_({ + field("min", float64()), + field("max", float64()), + })), + field("first_last", + struct_({field("first", float64()), field("last", float64())})), + field("first", float64()), + field("last", float64()), + }), + {R"([ + [1, 7, 8.875, {"min": -0.25, "max": 4.0}, {"first": 1.0, "last": 0.75}, 1.0, 0.75] ])", - R"([ - [0, 7, 8.875, {"min": -0.25, "max": 4.0}] + R"([ + [0, 7, 8.875, {"min": -0.25, "max": 4.0}, {"first": 1.0, "last": 0.75}, 1.0, 0.75] + ])"}); } @@ -4838,18 +5106,24 @@ Result> GetSingleSegmentKeyOutput() { field("min", float64()), field("max", float64()), })), + field("hash_first_last", struct_({ + field("first", float64()), + field("last", float64()), + })), + field("hash_first", float64()), + field("hash_last", float64()), }), {R"([ - [1, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ], - [1, 2, 3, -0.125, {"min": -0.25, "max": 0.125}], - [1, 3, 0, null, {"min": null, "max": null} ], - [1, null, 2, 4.75, {"min": 0.75, "max": 4.0} ] + [1, 1, 2, 4.25, {"min": 1.0, "max": 3.25}, {"first": 1.0, "last": 3.25}, 1.0, 3.25 ], + [1, 2, 3, -0.125, {"min": -0.25, "max": 0.125}, {"first": 0.0, "last": -0.25}, 0.0, -0.25], + [1, 3, 0, null, {"min": null, "max": null}, {"first": null, "last": null}, null, null], + [1, null, 2, 4.75, {"min": 0.75, "max": 4.0}, {"first": 4.0, "last": 0.75}, 4.0, 0.75] ])", R"([ - [0, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ], - [0, 2, 3, -0.125, {"min": -0.25, "max": 0.125}], - [0, 3, 0, null, {"min": null, "max": null} ], - [0, null, 2, 4.75, {"min": 0.75, "max": 4.0} ] + [0, 1, 2, 4.25, {"min": 1.0, "max": 3.25}, {"first": 1.0, "last": 3.25}, 1.0, 3.25 ], + [0, 2, 3, -0.125, {"min": -0.25, "max": 0.125}, {"first": 0.0, "last": -0.25}, 0.0, -0.25], + [0, 3, 0, null, {"min": null, "max": null}, {"first": null, "last": null}, null, null], + [0, null, 2, 4.75, {"min": 0.75, "max": 4.0}, {"first": 4.0, "last": 0.75}, 4.0, 0.75] ])"}); } diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index d6e18c3bb715e..8ec5c0f70a95d 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -1611,7 +1611,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithMultiThreading) { {"aggregate", AggregateNodeOptions{/*aggregates=*/{ {"count", nullptr, "i32", "count(i32)"}, }, - /*keys=*/{"i32"}, /*segment_leys=*/{"i32"}}}}); + /*keys=*/{}, /*segment_keys=*/{"i32"}}}}); EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, HasSubstr("multi-threaded"), DeclarationToExecBatches(std::move(plan))); } diff --git a/cpp/src/arrow/compute/api_aggregate.cc b/cpp/src/arrow/compute/api_aggregate.cc index 8cd3a8d2a3e96..49d8709660684 100644 --- a/cpp/src/arrow/compute/api_aggregate.cc +++ b/cpp/src/arrow/compute/api_aggregate.cc @@ -203,6 +203,16 @@ Result Sum(const Datum& value, const ScalarAggregateOptions& options, return CallFunction("sum", {value}, &options, ctx); } +Result First(const Datum& value, const ScalarAggregateOptions& options, + ExecContext* ctx) { + return CallFunction("first", {value}, &options, ctx); +} + +Result Last(const Datum& value, const ScalarAggregateOptions& options, + ExecContext* ctx) { + return CallFunction("last", {value}, &options, ctx); +} + Result MinMax(const Datum& value, const ScalarAggregateOptions& options, ExecContext* ctx) { return CallFunction("min_max", {value}, &options, ctx); diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h index 97c654266e577..8f45f6199fbe1 100644 --- a/cpp/src/arrow/compute/api_aggregate.h +++ b/cpp/src/arrow/compute/api_aggregate.h @@ -284,6 +284,36 @@ Result Sum( const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), ExecContext* ctx = NULLPTR); +/// \brief Calculate the first value of an array +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see ScalarAggregateOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return datum of the computed first as Scalar +/// +/// \since 13.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result First( + const Datum& value, + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), + ExecContext* ctx = NULLPTR); + +/// \brief Calculate the last value of an array +/// +/// \param[in] value input datum, expecting Array or ChunkedArray +/// \param[in] options see ScalarAggregateOptions for more information +/// \param[in] ctx the function execution context, optional +/// \return datum of the computed last as a Scalar +/// +/// \since 13.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result Last( + const Datum& value, + const ScalarAggregateOptions& options = ScalarAggregateOptions::Defaults(), + ExecContext* ctx = NULLPTR); + /// \brief Calculate the min / max of a numeric array /// /// This function returns both the min and max as a struct scalar, with type diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index b71e5a12b504c..66d38ecd64d49 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -323,7 +323,7 @@ TEST(ScalarAggregateFunction, DispatchExact) { std::vector in_args = {int8()}; ScalarAggregateKernel kernel(std::move(in_args), int64(), NoopInit, NoopConsume, - NoopMerge, NoopFinalize); + NoopMerge, NoopFinalize, /*ordered=*/false); ASSERT_OK(func.AddKernel(kernel)); in_args = {float64()}; diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index a642130cd7d89..7e408625b055f 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -644,22 +644,22 @@ using ScalarAggregateFinalize = Status (*)(KernelContext*, Datum*); /// * finalize: produces the end result of the aggregation using the /// KernelState in the KernelContext. struct ARROW_EXPORT ScalarAggregateKernel : public Kernel { - ScalarAggregateKernel() = default; - ScalarAggregateKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateConsume consume, ScalarAggregateMerge merge, - ScalarAggregateFinalize finalize) + ScalarAggregateFinalize finalize, const bool ordered) : Kernel(std::move(sig), std::move(init)), consume(consume), merge(merge), - finalize(finalize) {} + finalize(finalize), + ordered(ordered) {} ScalarAggregateKernel(std::vector in_types, OutputType out_type, KernelInit init, ScalarAggregateConsume consume, - ScalarAggregateMerge merge, ScalarAggregateFinalize finalize) + ScalarAggregateMerge merge, ScalarAggregateFinalize finalize, + const bool ordered) : ScalarAggregateKernel( KernelSignature::Make(std::move(in_types), std::move(out_type)), - std::move(init), consume, merge, finalize) {} + std::move(init), consume, merge, finalize, ordered) {} /// \brief Merge a vector of KernelStates into a single KernelState. /// The merged state will be returned and will be set on the KernelContext. @@ -670,6 +670,14 @@ struct ARROW_EXPORT ScalarAggregateKernel : public Kernel { ScalarAggregateConsume consume; ScalarAggregateMerge merge; ScalarAggregateFinalize finalize; + /// \brief Whether this kernel requires ordering + /// Some aggregations, such as, "first", requires some kind of input order. The + /// order can be implicit, e.g., the order of the input data, or explicit, e.g. + /// the ordering specified with a window aggregation. + /// The caller of the aggregate kernel is responsible for passing data in some + /// defined order to the kernel. The flag here is a way for the kernel to tell + /// the caller that data passed to the kernel must be defined in some order. + bool ordered = false; }; // ---------------------------------------------------------------------- @@ -699,25 +707,31 @@ struct ARROW_EXPORT HashAggregateKernel : public Kernel { HashAggregateKernel(std::shared_ptr sig, KernelInit init, HashAggregateResize resize, HashAggregateConsume consume, - HashAggregateMerge merge, HashAggregateFinalize finalize) + HashAggregateMerge merge, HashAggregateFinalize finalize, + const bool ordered) : Kernel(std::move(sig), std::move(init)), resize(resize), consume(consume), merge(merge), - finalize(finalize) {} + finalize(finalize), + ordered(ordered) {} HashAggregateKernel(std::vector in_types, OutputType out_type, KernelInit init, HashAggregateConsume consume, HashAggregateResize resize, HashAggregateMerge merge, - HashAggregateFinalize finalize) + HashAggregateFinalize finalize, const bool ordered) : HashAggregateKernel( KernelSignature::Make(std::move(in_types), std::move(out_type)), - std::move(init), resize, consume, merge, finalize) {} + std::move(init), resize, consume, merge, finalize, ordered) {} HashAggregateResize resize; HashAggregateConsume consume; HashAggregateMerge merge; HashAggregateFinalize finalize; + /// @brief whether the summarizer requires ordering + /// This is similar to ScalarAggregateKernel. See ScalarAggregateKernel + /// for detailed doc of this variable. + bool ordered = false; }; } // namespace compute diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index cd54776051db0..ddd241652460e 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -46,9 +46,10 @@ Status AggregateFinalize(KernelContext* ctx, Datum* out) { } // namespace void AddAggKernel(std::shared_ptr sig, KernelInit init, - ScalarAggregateFunction* func, SimdLevel::type simd_level) { + ScalarAggregateFunction* func, SimdLevel::type simd_level, + const bool ordered) { ScalarAggregateKernel kernel(std::move(sig), std::move(init), AggregateConsume, - AggregateMerge, AggregateFinalize); + AggregateMerge, AggregateFinalize, ordered); // Set the simd level kernel.simd_level = simd_level; DCHECK_OK(func->AddKernel(std::move(kernel))); @@ -56,9 +57,9 @@ void AddAggKernel(std::shared_ptr sig, KernelInit init, void AddAggKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateFinalize finalize, ScalarAggregateFunction* func, - SimdLevel::type simd_level) { + SimdLevel::type simd_level, const bool ordered) { ScalarAggregateKernel kernel(std::move(sig), std::move(init), AggregateConsume, - AggregateMerge, std::move(finalize)); + AggregateMerge, std::move(finalize), ordered); // Set the simd level kernel.simd_level = simd_level; DCHECK_OK(func->AddKernel(std::move(kernel))); @@ -439,6 +440,45 @@ struct ProductInit { } }; +// ---------------------------------------------------------------------- +// FirstLast implementation + +Result> FirstLastInit(KernelContext* ctx, + const KernelInitArgs& args) { + ARROW_ASSIGN_OR_RAISE(TypeHolder out_type, + args.kernel->signature->out_type().Resolve(ctx, args.inputs)); + + FirstLastInitState visitor(ctx, *args.inputs[0], out_type.GetSharedPtr(), + static_cast(*args.options)); + return visitor.Create(); +} + +// For "first" and "last" functions: override finalize and return the actual value +template +void AddFirstOrLastAggKernel(ScalarAggregateFunction* func, + ScalarAggregateFunction* first_last_func) { + auto sig = KernelSignature::Make({InputType::Any()}, FirstType); + auto init = [first_last_func]( + KernelContext* ctx, + const KernelInitArgs& args) -> Result> { + ARROW_ASSIGN_OR_RAISE(auto kernel, first_last_func->DispatchExact(args.inputs)); + KernelInitArgs new_args{kernel, args.inputs, args.options}; + return kernel->init(ctx, new_args); + }; + + auto finalize = [](KernelContext* ctx, Datum* out) -> Status { + Datum temp; + RETURN_NOT_OK(checked_cast(ctx->state())->Finalize(ctx, &temp)); + const auto& result = temp.scalar_as(); + DCHECK(result.is_valid); + *out = result.value[static_cast(first_or_last)]; + return Status::OK(); + }; + + AddAggKernel(std::move(sig), std::move(init), std::move(finalize), func, + SimdLevel::NONE, /*ordered=*/true); +} + // ---------------------------------------------------------------------- // MinMax implementation @@ -835,6 +875,25 @@ Result MinMaxType(KernelContext*, const std::vector& typ } // namespace +Result FirstLastType(KernelContext*, const std::vector& types) { + auto ty = types.front().GetSharedPtr(); + return struct_({field("first", ty), field("last", ty)}); +} + +void AddFirstLastKernel(KernelInit init, internal::detail::GetTypeId get_id, + ScalarAggregateFunction* func, SimdLevel::type simd_level) { + auto sig = KernelSignature::Make({InputType(get_id.id)}, FirstLastType); + AddAggKernel(std::move(sig), init, func, simd_level); +} + +void AddFirstLastKernels(KernelInit init, + const std::vector>& types, + ScalarAggregateFunction* func) { + for (const auto& ty : types) { + AddFirstLastKernel(init, ty, func, SimdLevel::NONE); + } +} + void AddMinMaxKernel(KernelInit init, internal::detail::GetTypeId get_id, ScalarAggregateFunction* func, SimdLevel::type simd_level) { auto sig = KernelSignature::Make({InputType(get_id.id)}, MinMaxType); @@ -894,6 +953,30 @@ const FunctionDoc mean_doc{ {"array"}, "ScalarAggregateOptions"}; +const FunctionDoc first_last_doc{ + "Compute the first and last values of an array", + ("Null values are ignored by default.\n" + "If skip_nulls = false, then this will return the first and last values\n" + "regardless if it is null"), + {"array"}, + "ScalarAggregateOptions"}; + +const FunctionDoc first_doc{ + "Compute the first value in each group", + ("Null values are ignored by default.\n" + "If skip_nulls = false, then this will return the first and last values\n" + "regardless if it is null"), + {"array"}, + "ScalarAggregateOptions"}; + +const FunctionDoc last_doc{ + "Compute the first value in each group", + ("Null values are ignored by default.\n" + "If skip_nulls = false, then this will return the first and last values\n" + "regardless if it is null"), + {"array"}, + "ScalarAggregateOptions"}; + const FunctionDoc min_max_doc{"Compute the minimum and maximum values of a numeric array", ("Null values are ignored by default.\n" "This can be changed through ScalarAggregateOptions."), @@ -1006,6 +1089,29 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { #endif DCHECK_OK(registry->AddFunction(std::move(func))); + // Add first last function + func = std::make_shared( + "first_last", Arity::Unary(), first_last_doc, &default_scalar_aggregate_options); + auto first_last_func = func.get(); + + AddFirstLastKernels(FirstLastInit, {boolean(), fixed_size_binary(1)}, func.get()); + AddFirstLastKernels(FirstLastInit, NumericTypes(), func.get()); + AddFirstLastKernels(FirstLastInit, BaseBinaryTypes(), func.get()); + AddFirstLastKernels(FirstLastInit, TemporalTypes(), func.get()); + DCHECK_OK(registry->AddFunction(std::move(func))); + + // Add first/last as convience functions + func = std::make_shared("first", Arity::Unary(), first_doc, + &default_scalar_aggregate_options); + AddFirstOrLastAggKernel(func.get(), first_last_func); + DCHECK_OK(registry->AddFunction(std::move(func))); + + func = std::make_shared("last", Arity::Unary(), last_doc, + &default_scalar_aggregate_options); + AddFirstOrLastAggKernel(func.get(), first_last_func); + DCHECK_OK(registry->AddFunction(std::move(func))); + + // Add min max function func = std::make_shared("min_max", Arity::Unary(), min_max_doc, &default_scalar_aggregate_options); AddMinMaxKernels(MinMaxInit, {null(), boolean()}, func.get()); diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index e7254834743ce..3de922531ab19 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -272,8 +272,275 @@ struct MeanKernelInit : public SumLikeInit { }; // ---------------------------------------------------------------------- -// MinMax implementation +// FirstLast implementation +template +struct FirstLastState {}; + +template +struct FirstLastState> { + using ThisType = FirstLastState; + using T = typename ArrowType::c_type; + using ScalarType = typename TypeTraits::ScalarType; + + ThisType& operator+=(const ThisType& rhs) { + this->first = this->has_values ? this->first : rhs.first; + this->first_is_null = this->has_any_values ? this->first_is_null : rhs.first_is_null; + this->last = rhs.has_values ? rhs.last : this->last; + this->last_is_null = rhs.last_is_null; + this->has_values |= rhs.has_values; + this->has_any_values |= rhs.has_any_values; + return *this; + } + + void MergeOne(T value) { + if (!has_values) { + this->first = value; + has_values = true; + } + this->last = value; + } + + T first = false; + T last = false; + bool has_values = false; + bool first_is_null = false; + bool last_is_null = false; + bool has_any_values = false; +}; + +template +struct FirstLastState> { + using ThisType = FirstLastState; + using T = typename ArrowType::c_type; + using ScalarType = typename TypeTraits::ScalarType; + + ThisType& operator+=(const ThisType& rhs) { + this->first = this->has_values ? this->first : rhs.first; + this->first_is_null = this->has_any_values ? this->first_is_null : rhs.first_is_null; + this->last = rhs.has_values ? rhs.last : this->last; + this->last_is_null = rhs.last_is_null; + this->has_values |= rhs.has_values; + this->has_any_values |= rhs.has_any_values; + return *this; + } + + void MergeOne(T value) { + if (!has_values) { + this->first = value; + has_values = true; + } + this->last = value; + } + + T first = std::numeric_limits::infinity(); + T last = std::numeric_limits::infinity(); + bool has_values = false; + + // These are updated in ConsumeScalar and ConsumeArray since null values don't + // invoke MergeOne + bool first_is_null = false; + bool last_is_null = false; + // has_any_values indicates whether there is any value (either null or non-null) + // (1) has_any_values = false: There is no value aggregated + // (2) has_any_values = true, has_values = false: There are only null values aggregated + // (3) has_any_values = true, has_values = true: There are both null and non-null values + // aggregated + bool has_any_values = false; +}; + +template +struct FirstLastState> { + using ThisType = FirstLastState; + using T = typename ArrowType::c_type; + using ScalarType = typename TypeTraits::ScalarType; + + ThisType& operator+=(const ThisType& rhs) { + this->first = this->has_values ? this->first : rhs.first; + this->last = rhs.has_values ? rhs.last : this->last; + this->first_is_null = this->has_any_values ? this->first_is_null : rhs.first_is_null; + this->last_is_null = rhs.last_is_null; + this->has_values |= rhs.has_values; + this->has_any_values |= rhs.has_any_values; + return *this; + } + + void MergeOne(T value) { + if (!has_values) { + this->first = value; + has_values = true; + } + last = value; + } + + T first = std::numeric_limits::infinity(); + T last = std::numeric_limits::infinity(); + bool has_values = false; + bool first_is_null = false; + bool last_is_null = false; + bool has_any_values = false; +}; + +template +struct FirstLastState::value || + std::is_same::value>> { + using ThisType = FirstLastState; + using ScalarType = typename TypeTraits::ScalarType; + ThisType& operator+=(const ThisType& rhs) { + this->first = this->has_values ? this->first : rhs.first; + this->last = rhs.has_values ? rhs.last : this->last; + this->first_is_null = this->has_any_values ? this->first_is_null : rhs.first_is_null; + this->last_is_null = rhs.last_is_null; + this->has_values |= rhs.has_values; + this->has_any_values |= rhs.has_any_values; + return *this; + } + + void MergeOne(std::string_view value) { + if (!has_values) { + first = std::string(value); + has_values = true; + } + last = std::string(value); + } + + std::string first = ""; + std::string last = ""; + bool has_values = false; + bool first_is_null = false; + bool last_is_null = false; + bool has_any_values = false; +}; + +template +struct FirstLastImpl : public ScalarAggregator { + using ArrayType = typename TypeTraits::ArrayType; + using ThisType = FirstLastImpl; + using StateType = FirstLastState; + + FirstLastImpl(std::shared_ptr out_type, ScalarAggregateOptions options) + : out_type(std::move(out_type)), options(std::move(options)), count(0) { + this->options.min_count = std::max(1, this->options.min_count); + } + + Status Consume(KernelContext*, const ExecSpan& batch) override { + if (batch[0].is_array()) { + return ConsumeArray(batch[0].array); + } + return ConsumeScalar(*batch[0].scalar); + } + + Status ConsumeScalar(const Scalar& scalar) { + this->state.has_any_values = true; + if (scalar.is_valid) { + this->state.MergeOne(internal::UnboxScalar::Unbox(scalar)); + } else { + if (!this->state.has_values) { + this->state.first_is_null = true; + } + } + this->count += scalar.is_valid; + return Status::OK(); + } + + Status ConsumeArray(const ArraySpan& arr_span) { + this->state.has_any_values = true; + ArrayType arr(arr_span.ToArrayData()); + const auto null_count = arr.null_count(); + this->count += arr.length() - null_count; + + if (null_count == 0) { + // If there are no null valus, we can just merge + // the first and last element + this->state.MergeOne(arr.GetView(0)); + this->state.MergeOne(arr.GetView(arr.length() - 1)); + } else { + int64_t first_i = -1; + int64_t last_i = -1; + + if (!this->state.has_values && arr.IsNull(0)) { + this->state.first_is_null = true; + } + + if (arr.IsNull(arr.length() - 1)) { + this->state.last_is_null = true; + } + + // Find the first and last non-null value and update state + for (int64_t i = 0; i < arr.length(); i++) { + if (!arr.IsNull(i)) { + first_i = i; + break; + } + } + if (first_i >= 0) { + for (int64_t i = arr.length() - 1; i >= 0; i--) { + if (!arr.IsNull(i)) { + last_i = i; + break; + } + } + DCHECK_GE(last_i, first_i); + this->state.MergeOne(arr.GetView(first_i)); + this->state.MergeOne(arr.GetView(last_i)); + } + } + + return Status::OK(); + } + + Status MergeFrom(KernelContext*, KernelState&& src) override { + const auto& other = checked_cast(src); + this->state += other.state; + this->count += other.count; + return Status::OK(); + } + + Status Finalize(KernelContext*, Datum* out) override { + const auto& struct_type = checked_cast(*out_type); + const auto& child_type = struct_type.field(0)->type(); + auto null_scalar = MakeNullScalar(child_type); + + std::vector> values; + + if (this->count < options.min_count) { + values = {null_scalar, null_scalar}; + } else { + if (state.has_values) { + if (options.skip_nulls) { + ARROW_ASSIGN_OR_RAISE(auto first_scalar, MakeScalar(child_type, state.first)); + ARROW_ASSIGN_OR_RAISE(auto last_scalar, MakeScalar(child_type, state.last)); + values = {first_scalar, last_scalar}; + } else { + ARROW_ASSIGN_OR_RAISE( + auto first_scalar, + state.first_is_null ? null_scalar : MakeScalar(child_type, state.first)); + ARROW_ASSIGN_OR_RAISE( + auto last_scalar, + state.last_is_null ? null_scalar : MakeScalar(child_type, state.last)); + + values = {first_scalar, last_scalar}; + } + } else { + // If there is no non-null values, we always output null regardless of + // skip_null + values = {null_scalar, null_scalar}; + } + } + + out->value = std::make_shared(std::move(values), this->out_type); + return Status::OK(); + } + + std::shared_ptr out_type; + ScalarAggregateOptions options; + int64_t count; + FirstLastState state; +}; + +// ---------------------------------------------------------------------- +// MinMax implementation template struct MinMaxState {}; @@ -624,6 +891,65 @@ struct NullMinMaxImpl : public ScalarAggregator { } }; +// First/Last + +struct FirstLastInitState { + std::unique_ptr state; + KernelContext* ctx; + const DataType& in_type; + std::shared_ptr out_type; + const ScalarAggregateOptions& options; + + FirstLastInitState(KernelContext* ctx, const DataType& in_type, + const std::shared_ptr& out_type, + const ScalarAggregateOptions& options) + : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {} + + Status Visit(const DataType& ty) { + return Status::NotImplemented("No first/last implemented for ", ty); + } + + Status Visit(const HalfFloatType& ty) { + return Status::NotImplemented("No first/last implemented for ", ty); + } + + Status Visit(const BooleanType&) { + state.reset(new FirstLastImpl(out_type, options)); + return Status::OK(); + } + + template + enable_if_physical_integer Visit(const Type&) { + using PhysicalType = typename Type::PhysicalType; + state.reset(new FirstLastImpl(out_type, options)); + return Status::OK(); + } + + template + enable_if_physical_floating_point Visit(const Type&) { + using PhysicalType = typename Type::PhysicalType; + state.reset(new FirstLastImpl(out_type, options)); + return Status::OK(); + } + + template + enable_if_base_binary Visit(const Type&) { + state.reset(new FirstLastImpl(out_type, options)); + return Status::OK(); + } + + template + enable_if_t::value, Status> Visit(const Type&) { + state.reset(new FirstLastImpl(out_type, options)); + return Status::OK(); + } + + Result> Create() { + RETURN_NOT_OK(VisitTypeInline(in_type, this)); + return std::move(state); + } +}; + template struct MinMaxInitState { std::unique_ptr state; diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h b/cpp/src/arrow/compute/kernels/aggregate_internal.h index 8fd67485d7fb1..168f063c770f3 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h @@ -100,17 +100,21 @@ struct ScalarAggregator : public KernelState { // kernel implementations together enum class VarOrStd : bool { Var, Std }; +// Helper to differentiate between first/last calculation so we can fold +// kernel implementations together +enum class FirstOrLast : bool { First, Last }; + // Helper to differentiate between min/max calculation so we can fold // kernel implementations together enum class MinOrMax : uint8_t { Min = 0, Max }; void AddAggKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateFunction* func, - SimdLevel::type simd_level = SimdLevel::NONE); + SimdLevel::type simd_level = SimdLevel::NONE, bool ordered = false); void AddAggKernel(std::shared_ptr sig, KernelInit init, ScalarAggregateFinalize finalize, ScalarAggregateFunction* func, - SimdLevel::type simd_level = SimdLevel::NONE); + SimdLevel::type simd_level = SimdLevel::NONE, bool ordered = false); using arrow::internal::VisitSetBitRunsVoid; diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index c7ae70e21083c..2a7ba1a51e433 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -1501,6 +1501,211 @@ TEST(TestNullMeanKernel, Basics) { ResultWith(null_result)); } +// +// First / Last +// + +template +class TestFirstLastKernel : public ::testing::Test { + using Traits = TypeTraits; + using ArrayType = typename Traits::ArrayType; + using c_type = typename ArrowType::c_type; + using ScalarType = typename Traits::ScalarType; + + public: + void AssertFirstLastIs(const Datum& array, c_type expected_first, c_type expected_last, + const ScalarAggregateOptions& options) { + ASSERT_OK_AND_ASSIGN(Datum out, CallFunction("first", {array}, &options)); + const auto& out_first = out.scalar_as(); + ASSERT_EQ(expected_first, out_first.value); + + ASSERT_OK_AND_ASSIGN(out, CallFunction("last", {array}, &options)); + const auto& out_last = out.scalar_as(); + ASSERT_EQ(expected_last, out_last.value); + } + + void AssertFirstLastIsNull(const Datum& array, const ScalarAggregateOptions& options) { + ASSERT_OK_AND_ASSIGN(Datum out, First(array, options)); + const auto& out_first = out.scalar_as(); + ASSERT_FALSE(out_first.is_valid); + + ASSERT_OK_AND_ASSIGN(out, Last(array, options)); + const auto& out_last = out.scalar_as(); + ASSERT_FALSE(out_last.is_valid); + } + + void AssertFirstLastIsNull(const std::string& json, + const ScalarAggregateOptions& options) { + auto array = ArrayFromJSON(type_singleton(), json); + AssertFirstLastIsNull(array, options); + } + + void AssertFirstLastIsNull(const std::vector& json, + const ScalarAggregateOptions& options) { + auto array = ChunkedArrayFromJSON(type_singleton(), json); + AssertFirstLastIsNull(array, options); + } + + void AssertFirstLastIs(const std::string& json, c_type expected_first, + c_type expected_last, const ScalarAggregateOptions& options) { + auto array = ArrayFromJSON(type_singleton(), json); + AssertFirstLastIs(array, expected_first, expected_last, options); + } + + void AssertFirstLastIs(const std::vector& json, c_type expected_min, + c_type expected_max, const ScalarAggregateOptions& options) { + auto array = ChunkedArrayFromJSON(type_singleton(), json); + AssertFirstLastIs(array, expected_min, expected_max, options); + } + + std::shared_ptr type_singleton() { + return default_type_instance(); + } +}; + +class TestBooleanFirstLastKernel : public TestFirstLastKernel {}; + +template +class TestNumericFirstLastKernel : public TestFirstLastKernel {}; + +template +class TestTemporalFirstLastKernel : public TestFirstLastKernel {}; + +TEST_F(TestBooleanFirstLastKernel, Basics) { + ScalarAggregateOptions options; + std::vector chunked_input0 = {"[]", "[]"}; + std::vector chunked_input1 = {"[null, true, null]", "[true, null]"}; + std::vector chunked_input2 = {"[false, false, false]", "[false]"}; + std::vector chunked_input3 = {"[null, true]", "[false, null]"}; + std::vector chunked_input4 = {"[false, null]", "[null, true]"}; + auto ty = struct_({field("first", boolean()), field("last", boolean())}); + + this->AssertFirstLastIsNull("[]", options); + this->AssertFirstLastIsNull("[null, null, null]", options); + this->AssertFirstLastIsNull(chunked_input0, options); + this->AssertFirstLastIs(chunked_input1, true, true, options); + this->AssertFirstLastIs(chunked_input2, false, false, options); + this->AssertFirstLastIs(chunked_input3, true, false, options); + this->AssertFirstLastIs(chunked_input4, false, true, options); + + options.skip_nulls = false; + this->AssertFirstLastIsNull("[]", options); + this->AssertFirstLastIsNull("[null, null, null]", options); + this->AssertFirstLastIsNull(chunked_input0, options); + this->AssertFirstLastIsNull(chunked_input1, options); + this->AssertFirstLastIs(chunked_input2, false, false, options); + this->AssertFirstLastIsNull(chunked_input3, options); + this->AssertFirstLastIs(chunked_input4, false, true, options); +} + +TYPED_TEST_SUITE(TestNumericFirstLastKernel, NumericArrowTypes); +TYPED_TEST(TestNumericFirstLastKernel, Basics) { + ScalarAggregateOptions options; + std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 8, null, 3, 4]"}; + std::vector chunked_input2 = {"[null, null, null, 7]", + "[null, 8, null, 3, 4, null]"}; + std::vector chunked_input3 = {"[null, null, null]", "[null, null]"}; + auto item_ty = default_type_instance(); + + this->AssertFirstLastIs("[5, 1, 2, 3, 4]", 5, 4, options); + this->AssertFirstLastIs("[5, null, 2, 3, null]", 5, 3, options); + this->AssertFirstLastIs(chunked_input1, 5, 4, options); + this->AssertFirstLastIs(chunked_input1[1], 9, 4, options); + this->AssertFirstLastIs(chunked_input2, 7, 4, options); + this->AssertFirstLastIsNull(chunked_input3[0], options); + this->AssertFirstLastIsNull(chunked_input3, options); + + options.skip_nulls = false; + this->AssertFirstLastIsNull(chunked_input2[1], options); + this->AssertFirstLastIsNull(chunked_input2, options); +} + +TYPED_TEST_SUITE(TestTemporalFirstLastKernel, TemporalArrowTypes); +TYPED_TEST(TestTemporalFirstLastKernel, Basics) { + ScalarAggregateOptions options; + std::vector chunked_input1 = {"[5, 1, 2, 3, 4]", "[9, 8, null, 3, 4]"}; + std::vector chunked_input2 = {"[null, null, null, null]", + "[null, 8, null, 3 ,4, null]"}; + auto item_ty = default_type_instance(); + + this->AssertFirstLastIs("[5, 1, 2, 3, 4]", 5, 4, options); + this->AssertFirstLastIs("[5, null, 2, 3, null]", 5, 3, options); + this->AssertFirstLastIs(chunked_input1, 5, 4, options); + this->AssertFirstLastIs(chunked_input2, 8, 4, options); + + options.skip_nulls = false; + this->AssertFirstLastIsNull(chunked_input2, options); +} + +template +class TestBaseBinaryFirstLastKernel : public ::testing::Test {}; +TYPED_TEST_SUITE(TestBaseBinaryFirstLastKernel, BaseBinaryArrowTypes); +TYPED_TEST(TestBaseBinaryFirstLastKernel, Basics) { + std::vector chunked_input1 = {R"(["cc", "", "aa", "b", "c"])", + R"(["d", "", null, "b", null])"}; + std::vector chunked_input2 = {R"(["aa", null, "aa", "b", "c"])", + R"(["d", "", "aa", "b", "bb"])"}; + std::vector chunked_input3 = {R"(["bb", "", "aa", "b", null])", + R"(["d", "", null, "b", "aa"])"}; + auto ty = std::make_shared(); + Datum null = ScalarFromJSON(ty, R"(null)"); + + // SKIP nulls by default + EXPECT_THAT(First(ArrayFromJSON(ty, R"([])")), ResultWith(null)); + EXPECT_THAT(First(ArrayFromJSON(ty, R"([null, null, null])")), ResultWith(null)); + EXPECT_THAT(First(ArrayFromJSON(ty, chunked_input1[0])), + ResultWith(ScalarFromJSON(ty, R"("cc")"))); + EXPECT_THAT(First(ChunkedArrayFromJSON(ty, chunked_input1)), + ResultWith(ScalarFromJSON(ty, R"("cc")"))); + EXPECT_THAT(First(ChunkedArrayFromJSON(ty, chunked_input3)), + ResultWith(ScalarFromJSON(ty, R"("bb")"))); + + EXPECT_THAT(Last(ArrayFromJSON(ty, R"([])")), ResultWith(null)); + EXPECT_THAT(Last(ArrayFromJSON(ty, R"([null, null, null])")), ResultWith(null)); + EXPECT_THAT(Last(ArrayFromJSON(ty, chunked_input1[0])), + ResultWith(ScalarFromJSON(ty, R"("c")"))); + EXPECT_THAT(Last(ChunkedArrayFromJSON(ty, chunked_input1)), + ResultWith(ScalarFromJSON(ty, R"("b")"))); + EXPECT_THAT(Last(ChunkedArrayFromJSON(ty, chunked_input3)), + ResultWith(ScalarFromJSON(ty, R"("aa")"))); + + EXPECT_THAT(Last(MakeNullScalar(ty)), ResultWith(null)); +} + +TEST(TestFixedSizeBinaryFirstLastKernel, Basics) { + auto ty = fixed_size_binary(2); + std::vector chunked_input1 = {R"(["cd", "aa", "ab", "bb", "cc"])", + R"(["da", "aa", null, "bb", "bb"])"}; + std::vector chunked_input2 = {R"([null, null, null, null, null])", + R"(["dd", "aa", "ab", "bb", "aa"])"}; + std::vector chunked_input3 = {R"(["aa", "aa", "ab", "bb", null])", + R"([null, null, null, null, null])"}; + Datum null = ScalarFromJSON(ty, R"(null)"); + + // SKIP nulls by default + EXPECT_THAT(First(ArrayFromJSON(ty, R"([])")), ResultWith(null)); + EXPECT_THAT(First(ArrayFromJSON(ty, R"([null, null, null])")), ResultWith(null)); + EXPECT_THAT(First(ArrayFromJSON(ty, chunked_input1[0])), + ResultWith(ScalarFromJSON(ty, R"("cd")"))); + EXPECT_THAT(First(ChunkedArrayFromJSON(ty, chunked_input1)), + ResultWith(ScalarFromJSON(ty, R"("cd")"))); + EXPECT_THAT(First(ChunkedArrayFromJSON(ty, chunked_input2)), + ResultWith(ScalarFromJSON(ty, R"("dd")"))); + EXPECT_THAT(First(ChunkedArrayFromJSON(ty, chunked_input3)), + ResultWith(ScalarFromJSON(ty, R"("aa")"))); + + EXPECT_THAT(Last(ArrayFromJSON(ty, R"([])")), ResultWith(null)); + EXPECT_THAT(Last(ArrayFromJSON(ty, R"([null, null, null])")), ResultWith(null)); + EXPECT_THAT(Last(ArrayFromJSON(ty, chunked_input1[0])), + ResultWith(ScalarFromJSON(ty, R"("cc")"))); + EXPECT_THAT(Last(ChunkedArrayFromJSON(ty, chunked_input1)), + ResultWith(ScalarFromJSON(ty, R"("bb")"))); + EXPECT_THAT(Last(ChunkedArrayFromJSON(ty, chunked_input2)), + ResultWith(ScalarFromJSON(ty, R"("aa")"))); + EXPECT_THAT(Last(ChunkedArrayFromJSON(ty, chunked_input3)), + ResultWith(ScalarFromJSON(ty, R"("bb")"))); +} + // // Min / Max // diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 380dde016ef47..4242680adb84f 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -105,22 +105,19 @@ Result ResolveGroupOutputType(KernelContext* ctx, } HashAggregateKernel MakeKernel(std::shared_ptr signature, - KernelInit init) { - HashAggregateKernel kernel; - kernel.init = std::move(init); - kernel.signature = std::move(signature); - kernel.resize = HashAggregateResize; - kernel.consume = HashAggregateConsume; - kernel.merge = HashAggregateMerge; - kernel.finalize = HashAggregateFinalize; + KernelInit init, const bool ordered = false) { + HashAggregateKernel kernel(std::move(signature), std::move(init), HashAggregateResize, + HashAggregateConsume, HashAggregateMerge, + HashAggregateFinalize, ordered); return kernel; } -HashAggregateKernel MakeKernel(InputType argument_type, KernelInit init) { +HashAggregateKernel MakeKernel(InputType argument_type, KernelInit init, + const bool ordered = false) { return MakeKernel( KernelSignature::Make({std::move(argument_type), InputType(Type::UINT32)}, OutputType(ResolveGroupOutputType)), - std::move(init)); + std::move(init), ordered); } HashAggregateKernel MakeUnaryKernel(KernelInit init) { @@ -1695,6 +1692,500 @@ struct GroupedMinMaxFactory { InputType argument_type; }; +// ---------------------------------------------------------------------- +// FirstLast implementation + +template +struct GroupedFirstLastImpl final : public GroupedAggregator { + using CType = typename TypeTraits::CType; + using GetSet = GroupedValueTraits; + using ArrType = + typename std::conditional::value, uint8_t, CType>::type; + + Status Init(ExecContext* ctx, const KernelInitArgs& args) override { + options_ = *checked_cast(args.options); + + // First and last non-null values + firsts_ = TypedBufferBuilder(ctx->memory_pool()); + lasts_ = TypedBufferBuilder(ctx->memory_pool()); + + // Whether the first/last element is null + first_is_nulls_ = TypedBufferBuilder(ctx->memory_pool()); + last_is_nulls_ = TypedBufferBuilder(ctx->memory_pool()); + + has_values_ = TypedBufferBuilder(ctx->memory_pool()); + has_any_values_ = TypedBufferBuilder(ctx->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + num_groups_ = new_num_groups; + // Reusing AntiExtrema as uninitialized value here because it doesn't + // matter what the value is. We never output the uninitialized + // first/last value. + RETURN_NOT_OK(firsts_.Append(added_groups, AntiExtrema::anti_min())); + RETURN_NOT_OK(lasts_.Append(added_groups, AntiExtrema::anti_max())); + RETURN_NOT_OK(has_values_.Append(added_groups, false)); + RETURN_NOT_OK(first_is_nulls_.Append(added_groups, false)); + RETURN_NOT_OK(last_is_nulls_.Append(added_groups, false)); + RETURN_NOT_OK(has_any_values_.Append(added_groups, false)); + return Status::OK(); + } + + Status Consume(const ExecSpan& batch) override { + auto raw_firsts = firsts_.mutable_data(); + auto raw_lasts = lasts_.mutable_data(); + auto raw_has_values = has_values_.mutable_data(); + auto raw_has_any_values = has_any_values_.mutable_data(); + auto raw_first_is_nulls = first_is_nulls_.mutable_data(); + auto raw_last_is_nulls = last_is_nulls_.mutable_data(); + + VisitGroupedValues( + batch, + [&](uint32_t g, CType val) { + if (!bit_util::GetBit(raw_has_values, g)) { + GetSet::Set(raw_firsts, g, val); + bit_util::SetBit(raw_has_values, g); + bit_util::SetBit(raw_has_any_values, g); + } + // No not need to set first_is_nulls because + // Once first_is_nulls is set to true it never + // changes + bit_util::SetBitTo(raw_last_is_nulls, g, false); + GetSet::Set(raw_lasts, g, val); + DCHECK(bit_util::GetBit(raw_has_values, g)); + }, + [&](uint32_t g) { + // We update first_is_null to true if this is called + // before we see any non-null values + if (!bit_util::GetBit(raw_has_values, g)) { + bit_util::SetBit(raw_first_is_nulls, g); + bit_util::SetBit(raw_has_any_values, g); + } + bit_util::SetBit(raw_last_is_nulls, g); + }); + return Status::OK(); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + // The merge is asymmetric. "first" from this state gets pick over "first" from other + // state. "last" from other state gets pick over from this state. This is so that when + // using with segmeneted aggregation, we still get the correct "first" and "last" + // value for the entire segement. + auto other = checked_cast(&raw_other); + + auto raw_firsts = firsts_.mutable_data(); + auto raw_lasts = lasts_.mutable_data(); + auto raw_has_values = has_values_.mutable_data(); + auto raw_has_any_values = has_any_values_.mutable_data(); + auto raw_first_is_nulls = first_is_nulls_.mutable_data(); + auto raw_last_is_nulls = last_is_nulls_.mutable_data(); + + auto other_raw_firsts = other->firsts_.mutable_data(); + auto other_raw_lasts = other->lasts_.mutable_data(); + auto other_raw_has_values = other->has_values_.mutable_data(); + auto other_raw_has_any_values = other->has_values_.mutable_data(); + auto other_raw_last_is_nulls = other->last_is_nulls_.mutable_data(); + + auto g = group_id_mapping.GetValues(1); + + for (uint32_t other_g = 0; static_cast(other_g) < group_id_mapping.length; + ++other_g, ++g) { + if (!bit_util::GetBit(raw_has_values, *g)) { + if (bit_util::GetBit(other_raw_has_values, other_g)) { + GetSet::Set(raw_firsts, *g, GetSet::Get(other_raw_firsts, other_g)); + } + } + if (bit_util::GetBit(other_raw_has_values, other_g)) { + GetSet::Set(raw_lasts, *g, GetSet::Get(other_raw_lasts, other_g)); + } + // If the current state doesn't have any nulls (null or non-null), then + // We take the "first_is_null" from rhs + if (!bit_util::GetBit(raw_has_any_values, *g)) { + bit_util::SetBitTo(raw_first_is_nulls, *g, + bit_util::GetBit(other->first_is_nulls_.data(), other_g)); + } + if (bit_util::GetBit(other_raw_last_is_nulls, other_g)) { + bit_util::SetBit(raw_last_is_nulls, *g); + } + + if (bit_util::GetBit(other_raw_has_values, other_g)) { + bit_util::SetBit(raw_has_values, *g); + } + + if (bit_util::GetBit(other_raw_has_any_values, other_g)) { + bit_util::SetBit(raw_has_any_values, *g); + } + } + return Status::OK(); + } + + Result Finalize() override { + // We initialize the null bitmap with first_is_nulls and last_is_nulls + // then update it depending on has_values + ARROW_ASSIGN_OR_RAISE(auto first_null_bitmap, first_is_nulls_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto last_null_bitmap, last_is_nulls_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto has_values, has_values_.Finish()); + + auto raw_first_null_bitmap = first_null_bitmap->mutable_data(); + auto raw_last_null_bitmap = last_null_bitmap->mutable_data(); + auto raw_has_values = has_values->data(); + + if (options_.skip_nulls) { + for (int i = 0; i < num_groups_; i++) { + const bool has_value = bit_util::GetBit(has_values->data(), i); + bit_util::SetBitTo(raw_first_null_bitmap, i, has_value); + bit_util::SetBitTo(raw_last_null_bitmap, i, has_value); + } + } else { + for (int i = 0; i < num_groups_; i++) { + // If first is null, we set the mask to false to output null + if (bit_util::GetBit(raw_first_null_bitmap, i)) { + bit_util::SetBitTo(raw_first_null_bitmap, i, false); + } else { + bit_util::SetBitTo(raw_first_null_bitmap, i, + bit_util::GetBit(raw_has_values, i)); + } + } + for (int i = 0; i < num_groups_; i++) { + // If last is null, we set the mask to false to output null + if (bit_util::GetBit(raw_last_null_bitmap, i)) { + bit_util::SetBitTo(raw_last_null_bitmap, i, false); + } else { + bit_util::SetBitTo(raw_last_null_bitmap, i, + bit_util::GetBit(raw_has_values, i)); + } + } + } + + auto firsts = + ArrayData::Make(type_, num_groups_, {std::move(first_null_bitmap), nullptr}); + auto lasts = + ArrayData::Make(type_, num_groups_, {std::move(last_null_bitmap), nullptr}); + ARROW_ASSIGN_OR_RAISE(firsts->buffers[1], firsts_.Finish()); + ARROW_ASSIGN_OR_RAISE(lasts->buffers[1], lasts_.Finish()); + + return ArrayData::Make(out_type(), num_groups_, {nullptr}, + {std::move(firsts), std::move(lasts)}); + } + + std::shared_ptr out_type() const override { + return struct_({field("first", type_), field("last", type_)}); + } + + int64_t num_groups_; + TypedBufferBuilder firsts_, lasts_; + // has_values is true if there is non-null values + // has_any_values is true if there is either null or non-null values + TypedBufferBuilder has_values_, has_any_values_, first_is_nulls_, last_is_nulls_; + std::shared_ptr type_; + ScalarAggregateOptions options_; +}; + +template +struct GroupedFirstLastImpl::value || + std::is_same::value>> + final : public GroupedAggregator { + using Allocator = arrow::stl::allocator; + using StringType = std::basic_string, Allocator>; + + Status Init(ExecContext* ctx, const KernelInitArgs& args) override { + ctx_ = ctx; + allocator_ = Allocator(ctx->memory_pool()); + options_ = *checked_cast(args.options); + // type_ initialized by FirstLastInit + // Whether the first/last element is null + first_is_nulls_ = TypedBufferBuilder(ctx->memory_pool()); + last_is_nulls_ = TypedBufferBuilder(ctx->memory_pool()); + has_values_ = TypedBufferBuilder(ctx->memory_pool()); + has_any_values_ = TypedBufferBuilder(ctx->memory_pool()); + return Status::OK(); + } + + Status Resize(int64_t new_num_groups) override { + auto added_groups = new_num_groups - num_groups_; + DCHECK_GE(added_groups, 0); + num_groups_ = new_num_groups; + firsts_.resize(new_num_groups); + lasts_.resize(new_num_groups); + RETURN_NOT_OK(has_values_.Append(added_groups, false)); + RETURN_NOT_OK(has_any_values_.Append(added_groups, false)); + RETURN_NOT_OK(first_is_nulls_.Append(added_groups, false)); + RETURN_NOT_OK(last_is_nulls_.Append(added_groups, false)); + return Status::OK(); + } + + Status Consume(const ExecSpan& batch) override { + auto raw_has_values = has_values_.mutable_data(); + auto raw_has_any_values = has_any_values_.mutable_data(); + auto raw_first_is_nulls = first_is_nulls_.mutable_data(); + auto raw_last_is_nulls = last_is_nulls_.mutable_data(); + + return VisitGroupedValues( + batch, + [&](uint32_t g, std::string_view val) { + if (!firsts_[g]) { + firsts_[g].emplace(val.data(), val.size(), allocator_); + bit_util::SetBit(raw_has_values, g); + bit_util::SetBit(raw_has_any_values, g); + } + bit_util::SetBitTo(raw_last_is_nulls, g, false); + lasts_[g].emplace(val.data(), val.size(), allocator_); + return Status::OK(); + }, + [&](uint32_t g) { + if (!bit_util::GetBit(raw_has_values, g)) { + bit_util::SetBit(raw_first_is_nulls, g); + bit_util::SetBit(raw_has_any_values, g); + } + bit_util::SetBit(raw_last_is_nulls, g); + return Status::OK(); + }); + } + + Status Merge(GroupedAggregator&& raw_other, + const ArrayData& group_id_mapping) override { + auto other = checked_cast(&raw_other); + auto g = group_id_mapping.GetValues(1); + for (uint32_t other_g = 0; static_cast(other_g) < group_id_mapping.length; + ++other_g, ++g) { + if (!firsts_[*g]) { + firsts_[*g] = std::move(other->firsts_[other_g]); + } + lasts_[*g] = std::move(other->lasts_[other_g]); + + if (!bit_util::GetBit(has_any_values_.data(), *g)) { + bit_util::SetBitTo(first_is_nulls_.mutable_data(), *g, + bit_util::GetBit(other->first_is_nulls_.data(), other_g)); + } + if (bit_util::GetBit(other->last_is_nulls_.data(), other_g)) { + bit_util::SetBit(last_is_nulls_.mutable_data(), *g); + } + if (bit_util::GetBit(other->has_values_.data(), other_g)) { + bit_util::SetBit(has_values_.mutable_data(), *g); + } + if (bit_util::GetBit(other->has_any_values_.data(), other_g)) { + bit_util::SetBit(has_any_values_.mutable_data(), *g); + } + } + return Status::OK(); + } + + Result Finalize() override { + ARROW_ASSIGN_OR_RAISE(auto first_null_bitmap, first_is_nulls_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto last_null_bitmap, last_is_nulls_.Finish()); + ARROW_ASSIGN_OR_RAISE(auto has_values, has_values_.Finish()); + + if (!options_.skip_nulls) { + for (int i = 0; i < num_groups_; i++) { + const bool first_is_null = bit_util::GetBit(first_null_bitmap->data(), i); + const bool has_value = bit_util::GetBit(has_values->data(), i); + if (first_is_null) { + bit_util::SetBitTo(first_null_bitmap->mutable_data(), i, false); + } else { + bit_util::SetBitTo(first_null_bitmap->mutable_data(), i, has_value); + } + } + + for (int i = 0; i < num_groups_; i++) { + const bool last_is_null = bit_util::GetBit(last_null_bitmap->data(), i); + const bool has_value = bit_util::GetBit(has_values->data(), i); + if (last_is_null) { + bit_util::SetBitTo(last_null_bitmap->mutable_data(), i, false); + } else { + bit_util::SetBitTo(last_null_bitmap->mutable_data(), i, has_value); + } + } + } else { + for (int i = 0; i < num_groups_; i++) { + const bool has_value = bit_util::GetBit(has_values->data(), i); + bit_util::SetBitTo(first_null_bitmap->mutable_data(), i, has_value); + bit_util::SetBitTo(last_null_bitmap->mutable_data(), i, has_value); + } + } + + auto firsts = + ArrayData::Make(type_, num_groups_, {std::move(first_null_bitmap), nullptr}); + auto lasts = + ArrayData::Make(type_, num_groups_, {std::move(last_null_bitmap), nullptr}); + RETURN_NOT_OK(MakeOffsetsValues(firsts.get(), firsts_)); + RETURN_NOT_OK(MakeOffsetsValues(lasts.get(), lasts_)); + return ArrayData::Make(out_type(), num_groups_, {nullptr}, + {std::move(firsts), std::move(lasts)}); + } + + template + enable_if_base_binary MakeOffsetsValues( + ArrayData* array, const std::vector>& values) { + using offset_type = typename T::offset_type; + ARROW_ASSIGN_OR_RAISE( + auto raw_offsets, + AllocateBuffer((1 + values.size()) * sizeof(offset_type), ctx_->memory_pool())); + offset_type* offsets = reinterpret_cast(raw_offsets->mutable_data()); + offsets[0] = 0; + offsets++; + const uint8_t* null_bitmap = array->buffers[0]->data(); + offset_type total_length = 0; + for (size_t i = 0; i < values.size(); i++) { + if (bit_util::GetBit(null_bitmap, i)) { + const std::optional& value = values[i]; + DCHECK(value.has_value()); + if (value->size() > + static_cast(std::numeric_limits::max()) || + arrow::internal::AddWithOverflow( + total_length, static_cast(value->size()), &total_length)) { + return Status::Invalid("Result is too large to fit in ", *array->type, + " cast to large_ variant of type"); + } + } + offsets[i] = total_length; + } + ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(total_length, ctx_->memory_pool())); + int64_t offset = 0; + for (size_t i = 0; i < values.size(); i++) { + if (bit_util::GetBit(null_bitmap, i)) { + const std::optional& value = values[i]; + DCHECK(value.has_value()); + std::memcpy(data->mutable_data() + offset, value->data(), value->size()); + offset += value->size(); + } + } + array->buffers[1] = std::move(raw_offsets); + array->buffers.push_back(std::move(data)); + return Status::OK(); + } + + template + enable_if_same MakeOffsetsValues( + ArrayData* array, const std::vector>& values) { + const uint8_t* null_bitmap = array->buffers[0]->data(); + const int32_t slot_width = + checked_cast(*array->type).byte_width(); + int64_t total_length = values.size() * slot_width; + ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(total_length, ctx_->memory_pool())); + int64_t offset = 0; + for (size_t i = 0; i < values.size(); i++) { + if (bit_util::GetBit(null_bitmap, i)) { + const std::optional& value = values[i]; + DCHECK(value.has_value()); + std::memcpy(data->mutable_data() + offset, value->data(), slot_width); + } else { + std::memset(data->mutable_data() + offset, 0x00, slot_width); + } + offset += slot_width; + } + array->buffers[1] = std::move(data); + return Status::OK(); + } + + std::shared_ptr out_type() const override { + return struct_({field("first", type_), field("last", type_)}); + } + + ExecContext* ctx_; + Allocator allocator_; + int64_t num_groups_; + std::vector> firsts_, lasts_; + TypedBufferBuilder has_values_, has_any_values_, first_is_nulls_, last_is_nulls_; + std::shared_ptr type_; + ScalarAggregateOptions options_; +}; + +template +Result> FirstLastInit(KernelContext* ctx, + const KernelInitArgs& args) { + ARROW_ASSIGN_OR_RAISE(auto impl, HashAggregateInit>(ctx, args)); + static_cast*>(impl.get())->type_ = + args.inputs[0].GetSharedPtr(); + return std::move(impl); +} + +template +HashAggregateKernel MakeFirstOrLastKernel(HashAggregateFunction* first_last_func) { + HashAggregateKernel kernel; + kernel.init = [first_last_func]( + KernelContext* ctx, + const KernelInitArgs& args) -> Result> { + std::vector inputs = args.inputs; + ARROW_ASSIGN_OR_RAISE(auto kernel, first_last_func->DispatchExact(args.inputs)); + KernelInitArgs new_args{kernel, inputs, args.options}; + return kernel->init(ctx, new_args); + }; + + kernel.signature = + KernelSignature::Make({InputType::Any(), Type::UINT32}, OutputType(FirstType)); + kernel.resize = HashAggregateResize; + kernel.consume = HashAggregateConsume; + kernel.merge = HashAggregateMerge; + kernel.finalize = [](KernelContext* ctx, Datum* out) { + ARROW_ASSIGN_OR_RAISE(Datum temp, + checked_cast(ctx->state())->Finalize()); + *out = temp.array_as()->field(static_cast(first_or_last)); + return Status::OK(); + }; + kernel.ordered = true; + return kernel; +} + +struct GroupedFirstLastFactory { + template + enable_if_physical_integer Visit(const T&) { + using PhysicalType = typename T::PhysicalType; + kernel = MakeKernel(std::move(argument_type), FirstLastInit, + /*ordered*/ true); + return Status::OK(); + } + + Status Visit(const FloatType&) { + kernel = + MakeKernel(std::move(argument_type), FirstLastInit, /*ordered*/ true); + return Status::OK(); + } + + Status Visit(const DoubleType&) { + kernel = + MakeKernel(std::move(argument_type), FirstLastInit, /*ordered*/ true); + return Status::OK(); + } + + template + enable_if_base_binary Visit(const T&) { + kernel = MakeKernel(std::move(argument_type), FirstLastInit); + return Status::OK(); + } + + Status Visit(const FixedSizeBinaryType&) { + kernel = MakeKernel(std::move(argument_type), FirstLastInit); + return Status::OK(); + } + + Status Visit(const BooleanType&) { + kernel = MakeKernel(std::move(argument_type), FirstLastInit); + return Status::OK(); + } + + Status Visit(const HalfFloatType& type) { + return Status::NotImplemented("Computing first/last of data of type ", type); + } + + Status Visit(const DataType& type) { + return Status::NotImplemented("Computing first/last of data of type ", type); + } + + static Result Make(const std::shared_ptr& type) { + GroupedFirstLastFactory factory; + factory.argument_type = type->id(); + RETURN_NOT_OK(VisitTypeInline(*type, &factory)); + return factory.kernel; + } + + HashAggregateKernel kernel; + InputType argument_type; +}; + // ---------------------------------------------------------------------- // Any/All implementation @@ -2788,6 +3279,30 @@ const FunctionDoc hash_approximate_median_doc{ {"array", "group_id_array"}, "ScalarAggregateOptions"}; +const FunctionDoc hash_first_last_doc{ + "Compute the first and last of values in each group", + ("Null values are ignored by default.\n" + "If skip_nulls = false, then this will return the first and last values\n" + "regardless if it is null"), + {"array", "group_id_array"}, + "ScalarAggregateOptions"}; + +const FunctionDoc hash_first_doc{ + "Compute the first value in each group", + ("Null values are ignored by default.\n" + "If skip_nulls = false, then this will return the first and last values\n" + "regardless if it is null"), + {"array", "group_id_array"}, + "ScalarAggregateOptions"}; + +const FunctionDoc hash_last_doc{ + "Compute the first value in each group", + ("Null values are ignored by default.\n" + "If skip_nulls = false, then this will return the first and last values\n" + "regardless if it is null"), + {"array", "group_id_array"}, + "ScalarAggregateOptions"}; + const FunctionDoc hash_min_max_doc{ "Compute the minimum and maximum of values in each group", ("Null values are ignored by default.\n" @@ -2961,6 +3476,40 @@ void RegisterHashAggregateBasic(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunction(std::move(func))); } + HashAggregateFunction* first_last_func = nullptr; + { + auto func = std::make_shared( + "hash_first_last", Arity::Binary(), hash_first_last_doc, + &default_scalar_aggregate_options); + + DCHECK_OK( + AddHashAggKernels(NumericTypes(), GroupedFirstLastFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(TemporalTypes(), GroupedFirstLastFactory::Make, func.get())); + DCHECK_OK( + AddHashAggKernels(BaseBinaryTypes(), GroupedFirstLastFactory::Make, func.get())); + DCHECK_OK(AddHashAggKernels({boolean(), fixed_size_binary(1)}, + GroupedFirstLastFactory::Make, func.get())); + + first_last_func = func.get(); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + + { + auto func = std::make_shared( + "hash_first", Arity::Binary(), hash_first_doc, &default_scalar_aggregate_options); + DCHECK_OK( + func->AddKernel(MakeFirstOrLastKernel(first_last_func))); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + + { + auto func = std::make_shared( + "hash_last", Arity::Binary(), hash_last_doc, &default_scalar_aggregate_options); + DCHECK_OK(func->AddKernel(MakeFirstOrLastKernel(first_last_func))); + DCHECK_OK(registry->AddFunction(std::move(func))); + } + HashAggregateFunction* min_max_func = nullptr; { auto func = std::make_shared( diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst index 183ede7ef588f..57fa2fd0cc2fa 100644 --- a/docs/source/cpp/compute.rst +++ b/docs/source/cpp/compute.rst @@ -212,8 +212,14 @@ the input to a single output value. +--------------------+---------+------------------+------------------------+----------------------------------+-------+ | count_distinct | Unary | Non-nested types | Scalar Int64 | :struct:`CountOptions` | \(2) | +--------------------+---------+------------------+------------------------+----------------------------------+-------+ +| first | Unary | Numeric, Binary | Scalar Input type | :struct:`ScalarAggregateOptions` | \(11) | ++--------------------+---------+------------------+------------------------+----------------------------------+-------+ +| first_last | Unary | Numeric, Binary | Scalar Struct | :struct:`ScalarAggregateOptions` | \(11) | ++--------------------+---------+------------------+------------------------+----------------------------------+-------+ | index | Unary | Any | Scalar Int64 | :struct:`IndexOptions` | \(3) | +--------------------+---------+------------------+------------------------+----------------------------------+-------+ +| last | Unary | Numeric, Binary | Scalar Input type | :struct:`ScalarAggregateOptions` | \(11) | ++--------------------+---------+------------------+------------------------+----------------------------------+-------+ | max | Unary | Non-nested types | Scalar Input type | :struct:`ScalarAggregateOptions` | | +--------------------+---------+------------------+------------------------+----------------------------------+-------+ | mean | Unary | Numeric | Scalar Decimal/Float64 | :struct:`ScalarAggregateOptions` | \(4) | @@ -273,6 +279,8 @@ the input to a single output value. fixed amount of memory. See the `reference implementation `_ for details. +* \(11) Result is based on the ordering of input data + Decimal arguments are cast to Float64 first. .. _grouped-aggregations-group-by: @@ -340,6 +348,12 @@ equivalents above and reflects how they are implemented internally. +-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+ | hash_distinct | Unary | Any | List of input type | :struct:`CountOptions` | \(2) \(3) | +-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+ +| hash_first | Unary | Numeric, Binary | Input type | :struct:`ScalarAggregateOptions` | \(10) | ++-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+ +| hash_first_last | Unary | Numeric, Binary | Struct | :struct:`ScalarAggregateOptions` | \(10) | ++-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+ +| hash_last | Unary | Numeric, Binary | Input type | :struct:`ScalarAggregateOptions` | \(10) | ++-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+ | hash_list | Unary | Any | List of input type | | \(3) | +-------------------------+---------+------------------------------------+------------------------+----------------------------------+-----------+ | hash_max | Unary | Non-nested, non-binary/string-like | Input type | :struct:`ScalarAggregateOptions` | | @@ -398,6 +412,8 @@ equivalents above and reflects how they are implemented internally. fixed amount of memory. See the `reference implementation `_ for details. +* \(10) Result is based on ordering of the input data. + Decimal arguments are cast to Float64 first. Element-wise ("scalar") functions @@ -1412,7 +1428,7 @@ null input value is converted into a null output value. * \(4) Offsets are unchanged, the keys and values are cast from respective input to output types (if a conversion is available). If output type is a list of - struct, the key field is output as the first field and the value field the + struct, the key field is output as the first field and the value field the second field, regardless of field names chosen. * \(5) Any input type that can be cast to the resulting extension's storage type. @@ -1614,7 +1630,7 @@ an ``Invalid`` :class:`Status` when overflow is detected. * \(1) CumulativeSumOptions has two optional parameters. The first parameter :member:`CumulativeSumOptions::start` is a starting value for the running sum. It has a default value of 0. Specified values of ``start`` must have the - same type as the input. The second parameter + same type as the input. The second parameter :member:`CumulativeSumOptions::skip_nulls` is a boolean. When set to false (the default), the first encountered null is propagated. When set to true, each null in the input produces a corresponding null in the output.