diff --git a/src/common/enum_util.cpp b/src/common/enum_util.cpp index 5cdc03b3979..2eb2e075dd7 100644 --- a/src/common/enum_util.cpp +++ b/src/common/enum_util.cpp @@ -155,6 +155,29 @@ AccessMode EnumUtil::FromString(const char *value) { throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value)); } +template<> +const char* EnumUtil::ToChars(AggregateCombineType value) { + switch(value) { + case AggregateCombineType::PRESERVE_INPUT: + return "PRESERVE_INPUT"; + case AggregateCombineType::ALLOW_DESTRUCTIVE: + return "ALLOW_DESTRUCTIVE"; + default: + throw NotImplementedException(StringUtil::Format("Enum value: '%d' not implemented", value)); + } +} + +template<> +AggregateCombineType EnumUtil::FromString(const char *value) { + if (StringUtil::Equals(value, "PRESERVE_INPUT")) { + return AggregateCombineType::PRESERVE_INPUT; + } + if (StringUtil::Equals(value, "ALLOW_DESTRUCTIVE")) { + return AggregateCombineType::ALLOW_DESTRUCTIVE; + } + throw NotImplementedException(StringUtil::Format("Enum value: '%s' not implemented", value)); +} + template<> const char* EnumUtil::ToChars(AggregateHandling value) { switch(value) { diff --git a/src/common/row_operations/row_aggregate.cpp b/src/common/row_operations/row_aggregate.cpp index 6c89d887006..f6e9e6cbb37 100644 --- a/src/common/row_operations/row_aggregate.cpp +++ b/src/common/row_operations/row_aggregate.cpp @@ -82,7 +82,8 @@ void RowOperations::CombineStates(RowOperationsState &state, TupleDataLayout &la for (auto &aggr : layout.GetAggregates()) { D_ASSERT(aggr.function.combine); - AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator); + AggregateInputData aggr_input_data(aggr.GetFunctionData(), state.allocator, + AggregateCombineType::ALLOW_DESTRUCTIVE); aggr.function.combine(sources, targets, aggr_input_data, count); // Move to the next aggregate states diff --git a/src/common/types/list_segment.cpp b/src/common/types/list_segment.cpp index e5a41087d99..f0308268b3c 100644 --- a/src/common/types/list_segment.cpp +++ b/src/common/types/list_segment.cpp @@ -521,10 +521,8 @@ static void ReadDataFromArraySegment(const ListSegmentFunctions &functions, cons functions.child_functions[0].BuildListVector(linked_child_list, child_vector, child_size); } -void ListSegmentFunctions::BuildListVector(const LinkedList &linked_list, Vector &result, - idx_t &initial_total_count) const { +void ListSegmentFunctions::BuildListVector(const LinkedList &linked_list, Vector &result, idx_t total_count) const { auto &read_data_from_segment = *this; - idx_t total_count = initial_total_count; auto segment = linked_list.first_segment; while (segment) { read_data_from_segment.read_data(read_data_from_segment, segment, result, total_count); diff --git a/src/core_functions/aggregate/README.md b/src/core_functions/aggregate/README.md index f600e2ce338..8b9fa3477f3 100644 --- a/src/core_functions/aggregate/README.md +++ b/src/core_functions/aggregate/README.md @@ -127,9 +127,13 @@ the generator is `StateCombine` and the method it wraps is: Combine(const State& source, State &target, AggregateInputData &info) ``` -Note that the `sources` should _not_ be modified for efficiency because the caller may be using them -for multiple operations(e.g., window segment trees). -If you wish to combine destructively, you _must_ define a `window` function. +Note that the `source` should _not_ be modified for efficiency because the caller may be using them +for multiple operations (e.g., window segment trees). + +If you wish to combine destructively, you _must_ check that the `combine_type` member +of the `AggregateInputData` argument is set to `ALLOW_DESTRUCTIVE`. +This is useful when the aggregate can move data more efficiently than copying it. +`LIST` is an example, where the internal linked list data structures can be spliced instead of copied. The `combine` operation is optional, but it is needed for multi-threaded aggregation. If it is not provided, then _all_ aggregate functions in the grouping must be computed on a single thread. @@ -184,9 +188,6 @@ Window(const ArgType *arg, ValidityMask &filter, ValidityMask &valid, ResultType &result, idx_t rid, idx_tbias) ``` -Defining `window` is also useful if the aggregate wishes to use a destructive `combine` operation. -This may be tricky to implement efficiently. - ### Bind ```cpp diff --git a/src/core_functions/aggregate/nested/list.cpp b/src/core_functions/aggregate/nested/list.cpp index c6a00e80b0c..1895cd27bd5 100644 --- a/src/core_functions/aggregate/nested/list.cpp +++ b/src/core_functions/aggregate/nested/list.cpp @@ -67,7 +67,9 @@ static void ListUpdateFunction(Vector inputs[], AggregateInputData &aggr_input_d } } -static void ListCombineFunction(Vector &states_vector, Vector &combined, AggregateInputData &, idx_t count) { +static void ListAbsorbFunction(Vector &states_vector, Vector &combined, AggregateInputData &aggr_input_data, + idx_t count) { + D_ASSERT(aggr_input_data.combine_type == AggregateCombineType::ALLOW_DESTRUCTIVE); UnifiedVectorFormat states_data; states_vector.ToUnifiedFormat(count, states_data); @@ -147,58 +149,38 @@ static void ListFinalize(Vector &states_vector, AggregateInputData &aggr_input_d ListVector::SetListSize(result, total_len); } -static void ListWindow(AggregateInputData &aggr_input_data, const WindowPartitionInput &partition, - const_data_ptr_t g_state, data_ptr_t l_state, const SubFrames &frames, Vector &result, - idx_t rid) { +static void ListCombineFunction(Vector &states_vector, Vector &combined, AggregateInputData &aggr_input_data, + idx_t count) { - auto &list_bind_data = aggr_input_data.bind_data->Cast(); - LinkedList linked_list; - - // UPDATE step - - D_ASSERT(partition.input_count == 1); - // FIXME: We are modifying the window operator's data here - auto &input = const_cast(partition.inputs[0]); - - // FIXME: we unify more values than necessary (count is frame.end) - const auto count = frames.back().end; - - RecursiveUnifiedVectorFormat input_data; - Vector::RecursiveToUnifiedFormat(input, count, input_data); - - for (const auto &frame : frames) { - for (idx_t i = frame.start; i < frame.end; i++) { - list_bind_data.functions.AppendRow(aggr_input_data.allocator, linked_list, input_data, i); - } + // Can we use destructive combining? + if (aggr_input_data.combine_type == AggregateCombineType::ALLOW_DESTRUCTIVE) { + ListAbsorbFunction(states_vector, combined, aggr_input_data, count); + return; } - // FINALIZE step - - D_ASSERT(result.GetType().id() == LogicalTypeId::LIST); - auto result_data = FlatVector::GetData(result); - size_t total_len = ListVector::GetListSize(result); + UnifiedVectorFormat states_data; + states_vector.ToUnifiedFormat(count, states_data); + auto states_ptr = UnifiedVectorFormat::GetData(states_data); + auto combined_ptr = FlatVector::GetData(combined); - // set the length and offset of this list in the result vector - result_data[rid].offset = total_len; - result_data[rid].length = linked_list.total_capacity; + auto &list_bind_data = aggr_input_data.bind_data->Cast(); + auto result_type = ListType::GetChildType(list_bind_data.stype); - // Empty frames produce NULL to track PG - if (!linked_list.total_capacity) { - auto &mask = FlatVector::Validity(result); - mask.SetInvalid(rid); - return; - } + for (idx_t i = 0; i < count; i++) { + auto &source = *states_ptr[states_data.sel->get_index(i)]; + auto &target = *combined_ptr[i]; - D_ASSERT(linked_list.total_capacity != 0); - total_len += linked_list.total_capacity; + const auto entry_count = source.linked_list.total_capacity; + Vector input(result_type, source.linked_list.total_capacity); + list_bind_data.functions.BuildListVector(source.linked_list, input, 0); - // reserve capacity, then copy over the data to the child vector - ListVector::Reserve(result, total_len); - auto &result_child = ListVector::GetEntry(result); - idx_t offset = result_data[rid].offset; - list_bind_data.functions.BuildListVector(linked_list, result_child, offset); + RecursiveUnifiedVectorFormat input_data; + Vector::RecursiveToUnifiedFormat(input, entry_count, input_data); - ListVector::SetListSize(result, total_len); + for (idx_t entry_idx = 0; entry_idx < entry_count; ++entry_idx) { + list_bind_data.functions.AppendRow(aggr_input_data.allocator, target.linked_list, input_data, entry_idx); + } + } } unique_ptr ListBindFunction(ClientContext &context, AggregateFunction &function, @@ -217,10 +199,12 @@ unique_ptr ListBindFunction(ClientContext &context, AggregateFunct } AggregateFunction ListFun::GetFunction() { - return AggregateFunction({LogicalType::ANY}, LogicalTypeId::LIST, AggregateFunction::StateSize, - AggregateFunction::StateInitialize, ListUpdateFunction, - ListCombineFunction, ListFinalize, nullptr, ListBindFunction, nullptr, nullptr, - ListWindow); + auto func = + AggregateFunction({LogicalType::ANY}, LogicalTypeId::LIST, AggregateFunction::StateSize, + AggregateFunction::StateInitialize, ListUpdateFunction, + ListCombineFunction, ListFinalize, nullptr, ListBindFunction, nullptr, nullptr, nullptr); + + return func; } } // namespace duckdb diff --git a/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp b/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp index bfdc4dc27aa..73dcc7c47a9 100644 --- a/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp +++ b/src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp @@ -354,7 +354,8 @@ SinkCombineResultType PhysicalUngroupedAggregate::Combine(ExecutionContext &cont Vector source_state(Value::POINTER(CastPointerToValue(lstate.state.aggregates[aggr_idx].get()))); Vector dest_state(Value::POINTER(CastPointerToValue(gstate.state.aggregates[aggr_idx].get()))); - AggregateInputData aggr_input_data(aggregate.bind_info.get(), gstate.allocator); + AggregateInputData aggr_input_data(aggregate.bind_info.get(), gstate.allocator, + AggregateCombineType::ALLOW_DESTRUCTIVE); aggregate.function.combine(source_state, dest_state, aggr_input_data, 1); #ifdef DEBUG gstate.state.counts[aggr_idx] += lstate.state.counts[aggr_idx]; @@ -548,7 +549,8 @@ void UngroupedDistinctAggregateFinalizeTask::AggregateDistinct() { } auto &aggregate = aggregates[agg_idx]->Cast(); - AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator); + AggregateInputData aggr_input_data(aggregate.bind_info.get(), allocator, + AggregateCombineType::ALLOW_DESTRUCTIVE); Vector state_vec(Value::POINTER(CastPointerToValue(state.aggregates[agg_idx].get()))); Vector combined_vec(Value::POINTER(CastPointerToValue(gstate.state.aggregates[agg_idx].get()))); diff --git a/src/execution/window_segment_tree.cpp b/src/execution/window_segment_tree.cpp index dffae752150..95c0e5ab1b4 100644 --- a/src/execution/window_segment_tree.cpp +++ b/src/execution/window_segment_tree.cpp @@ -639,6 +639,9 @@ WindowSegmentTree::~WindowSegmentTree() { class WindowSegmentTreePart { public: + //! Right side nodes need to be cached and processed in reverse order + using RightEntry = std::pair; + enum FramePart : uint8_t { FULL = 0, LEFT = 1, RIGHT = 2 }; WindowSegmentTreePart(ArenaAllocator &allocator, const AggregateObject &aggr, DataChunk &inputs, @@ -653,7 +656,7 @@ class WindowSegmentTreePart { void ExtractFrame(idx_t begin, idx_t end, data_ptr_t current_state); void WindowSegmentValue(const WindowSegmentTree &tree, idx_t l_idx, idx_t begin, idx_t end, data_ptr_t current_state); - //! optionally writes result and calls destructors + //! Writes result and calls destructors void Finalize(Vector &result, idx_t count); void Combine(WindowSegmentTreePart &other, idx_t count); @@ -661,12 +664,23 @@ class WindowSegmentTreePart { void Evaluate(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, Vector &result, idx_t count, idx_t row_idx, FramePart frame_part); +protected: + //! Initialises the accumulation state vector (statef) + void Initialize(idx_t count); + //! Accumulate upper tree levels + void EvaluateUpperLevels(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, idx_t count, + idx_t row_idx, FramePart frame_part); + void EvaluateLeaves(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, idx_t count, + idx_t row_idx, FramePart frame_part, FramePart leaf_part); + public: //! Allocator for aggregates ArenaAllocator &allocator; //! The aggregate function const AggregateObject &aggr; - //! The aggregate function + //! Order insensitive aggregate (we can optimise internal combines) + const bool order_insensitive; + //! The partition arguments DataChunk &inputs; //! The filtered rows in inputs const ValidityMask &filter_mask; @@ -686,6 +700,8 @@ class WindowSegmentTreePart { Vector statef; //! Count of buffered values idx_t flush_count; + //! Cache of right side tree ranges for ordered aggregates + vector right_stack; }; class WindowSegmentTreeState : public WindowAggregatorState { @@ -708,9 +724,10 @@ class WindowSegmentTreeState : public WindowAggregatorState { WindowSegmentTreePart::WindowSegmentTreePart(ArenaAllocator &allocator, const AggregateObject &aggr, DataChunk &inputs, const ValidityMask &filter_mask) - : allocator(allocator), aggr(aggr), inputs(inputs), filter_mask(filter_mask), - state_size(aggr.function.state_size()), state(state_size * STANDARD_VECTOR_SIZE), statep(LogicalType::POINTER), - statel(LogicalType::POINTER), statef(LogicalType::POINTER), flush_count(0) { + : allocator(allocator), aggr(aggr), + order_insensitive(aggr.function.order_dependent == AggregateOrderDependent::NOT_ORDER_DEPENDENT), inputs(inputs), + filter_mask(filter_mask), state_size(aggr.function.state_size()), state(state_size * STANDARD_VECTOR_SIZE), + statep(LogicalType::POINTER), statel(LogicalType::POINTER), statef(LogicalType::POINTER), flush_count(0) { if (inputs.ColumnCount() > 0) { leaves.Initialize(Allocator::DefaultAllocator(), inputs.GetTypes()); filter_sel.Initialize(); @@ -900,18 +917,43 @@ void WindowSegmentTree::Evaluate(WindowAggregatorState &lstate, const DataChunk void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, Vector &result, idx_t count, idx_t row_idx, FramePart frame_part) { + D_ASSERT(aggr.function.combine && tree.UseCombineAPI()); + + Initialize(count); + + if (order_insensitive) { + // First pass: aggregate the segment tree nodes with sharing + EvaluateUpperLevels(tree, begins, ends, count, row_idx, frame_part); + + // Second pass: aggregate the ragged leaves + EvaluateLeaves(tree, begins, ends, count, row_idx, frame_part, FramePart::FULL); + } else { + // Evaluate leaves in order + EvaluateLeaves(tree, begins, ends, count, row_idx, frame_part, FramePart::LEFT); + EvaluateUpperLevels(tree, begins, ends, count, row_idx, frame_part); + EvaluateLeaves(tree, begins, ends, count, row_idx, frame_part, FramePart::RIGHT); + } +} - const auto cant_combine = (!aggr.function.combine || !tree.UseCombineAPI()); +void WindowSegmentTreePart::Initialize(idx_t count) { + auto fdata = FlatVector::GetData(statef); + for (idx_t rid = 0; rid < count; ++rid) { + auto state_ptr = fdata[rid]; + aggr.function.initialize(state_ptr); + } +} + +void WindowSegmentTreePart::EvaluateUpperLevels(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, + idx_t count, idx_t row_idx, FramePart frame_part) { auto fdata = FlatVector::GetData(statef); const auto exclude_mode = tree.exclude_mode; const bool begin_on_curr_row = frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::CURRENT_ROW; const bool end_on_curr_row = frame_part == FramePart::LEFT && exclude_mode == WindowExcludeMode::CURRENT_ROW; - // with EXCLUDE TIES, in addition to the frame part right of the peer group's end, we also need to consider the - // current row - const bool add_curr_row = frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::TIES; - // First pass: aggregate the segment tree nodes + const auto max_level = tree.levels_flat_start.size() + 1; + right_stack.resize(max_level, {0, 0}); + // Share adjacent identical states // We do this first because we want to share only tree aggregations idx_t prev_begin = 1; @@ -921,12 +963,6 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t data_ptr_t prev_state = nullptr; for (idx_t rid = 0, cur_row = row_idx; rid < count; ++rid, ++cur_row) { auto state_ptr = fdata[rid]; - aggr.function.initialize(state_ptr); - - if (cant_combine) { - // Make sure we initialise all states - continue; - } auto begin = begin_on_curr_row ? cur_row + 1 : begins[rid]; auto end = end_on_curr_row ? cur_row : ends[rid]; @@ -936,7 +972,8 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t // Skip level 0 idx_t l_idx = 0; - for (; l_idx < tree.levels_flat_start.size() + 1; l_idx++) { + idx_t right_max = 0; + for (; l_idx < max_level; l_idx++) { idx_t parent_begin = begin / tree.TREE_FANOUT; idx_t parent_end = end / tree.TREE_FANOUT; if (prev_state && l_idx == 1 && begin == prev_begin && end == prev_end) { @@ -949,7 +986,7 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t break; } - if (l_idx == 1) { + if (order_insensitive && l_idx == 1) { prev_state = state_ptr; prev_begin = begin; prev_end = end; @@ -971,17 +1008,52 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t idx_t group_end = parent_end * tree.TREE_FANOUT; if (end != group_end) { if (l_idx) { - WindowSegmentValue(tree, l_idx, group_end, end, state_ptr); + if (order_insensitive) { + WindowSegmentValue(tree, l_idx, group_end, end, state_ptr); + } else { + right_stack[l_idx] = {group_end, end}; + right_max = l_idx; + } } } begin = parent_begin; end = parent_end; } + + // Flush the right side values from left to right for order_sensitive aggregates + // As we go up the tree, the right side ranges move left, + // so we just cache them in a fixed size, preallocated array. + // Then we can just reverse scan the array and append the cached ranges. + for (l_idx = right_max; l_idx > 0; --l_idx) { + auto &right_entry = right_stack[l_idx]; + const auto group_end = right_entry.first; + const auto end = right_entry.second; + if (end) { + WindowSegmentValue(tree, l_idx, group_end, end, state_ptr); + right_entry = {0, 0}; + } + } } FlushStates(true); +} + +void WindowSegmentTreePart::EvaluateLeaves(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, + idx_t count, idx_t row_idx, FramePart frame_part, FramePart leaf_part) { + + auto fdata = FlatVector::GetData(statef); + + // For order-sensitive aggregates, we have to process the ragged leaves in two pieces. + // The left side have to be added before the main tree followed by the ragged right sides. + // The current row is the leftmost value of the right hand side. + const bool compute_left = leaf_part != FramePart::RIGHT; + const bool compute_right = leaf_part != FramePart::LEFT; + const auto exclude_mode = tree.exclude_mode; + const bool begin_on_curr_row = frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::CURRENT_ROW; + const bool end_on_curr_row = frame_part == FramePart::LEFT && exclude_mode == WindowExcludeMode::CURRENT_ROW; + // with EXCLUDE TIES, in addition to the frame part right of the peer group's end, we also need to consider the + // current row + const bool add_curr_row = compute_left && frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::TIES; - // Second pass: aggregate the ragged leaves - // (or everything if we can't combine) for (idx_t rid = 0, cur_row = row_idx; rid < count; ++rid, ++cur_row) { auto state_ptr = fdata[rid]; @@ -994,21 +1066,21 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t continue; } - // Aggregate everything at once if we can't combine states idx_t parent_begin = begin / tree.TREE_FANOUT; idx_t parent_end = end / tree.TREE_FANOUT; - if (parent_begin == parent_end || cant_combine) { - WindowSegmentValue(tree, 0, begin, end, state_ptr); + if (parent_begin == parent_end) { + if (compute_left) { + WindowSegmentValue(tree, 0, begin, end, state_ptr); + } continue; } idx_t group_begin = parent_begin * tree.TREE_FANOUT; - if (begin != group_begin) { + if (begin != group_begin && compute_left) { WindowSegmentValue(tree, 0, begin, group_begin + tree.TREE_FANOUT, state_ptr); - parent_begin++; } idx_t group_end = parent_end * tree.TREE_FANOUT; - if (end != group_end) { + if (end != group_end && compute_right) { WindowSegmentValue(tree, 0, group_end, end, state_ptr); } } diff --git a/src/function/scalar/system/aggregate_export.cpp b/src/function/scalar/system/aggregate_export.cpp index e71255384f4..e6adcee0893 100644 --- a/src/function/scalar/system/aggregate_export.cpp +++ b/src/function/scalar/system/aggregate_export.cpp @@ -171,7 +171,7 @@ static void AggregateStateCombine(DataChunk &input, ExpressionState &state_p, Ve memcpy(local_state.state_buffer0.get(), state0.GetData(), bind_data.state_size); memcpy(local_state.state_buffer1.get(), state1.GetData(), bind_data.state_size); - AggregateInputData aggr_input_data(nullptr, local_state.allocator); + AggregateInputData aggr_input_data(nullptr, local_state.allocator, AggregateCombineType::ALLOW_DESTRUCTIVE); bind_data.aggr.combine(local_state.state_vector0, local_state.state_vector1, aggr_input_data, 1); result_ptr[i] = StringVector::AddStringOrBlob(result, const_char_ptr_cast(local_state.state_buffer1.get()), diff --git a/src/include/duckdb/common/enum_util.hpp b/src/include/duckdb/common/enum_util.hpp index 0908d1998f2..a577e8014de 100644 --- a/src/include/duckdb/common/enum_util.hpp +++ b/src/include/duckdb/common/enum_util.hpp @@ -34,6 +34,8 @@ struct EnumUtil { enum class AccessMode : uint8_t; +enum class AggregateCombineType : uint8_t; + enum class AggregateHandling : uint8_t; enum class AggregateOrderDependent : uint8_t; @@ -310,6 +312,9 @@ enum class WithinCollection : uint8_t; template<> const char* EnumUtil::ToChars(AccessMode value); +template<> +const char* EnumUtil::ToChars(AggregateCombineType value); + template<> const char* EnumUtil::ToChars(AggregateHandling value); @@ -722,6 +727,9 @@ const char* EnumUtil::ToChars(WithinCollection value); template<> AccessMode EnumUtil::FromString(const char *value); +template<> +AggregateCombineType EnumUtil::FromString(const char *value); + template<> AggregateHandling EnumUtil::FromString(const char *value); diff --git a/src/include/duckdb/common/types/list_segment.hpp b/src/include/duckdb/common/types/list_segment.hpp index ea4c2ad89f0..79f359faff1 100644 --- a/src/include/duckdb/common/types/list_segment.hpp +++ b/src/include/duckdb/common/types/list_segment.hpp @@ -51,7 +51,7 @@ struct ListSegmentFunctions { void AppendRow(ArenaAllocator &allocator, LinkedList &linked_list, RecursiveUnifiedVectorFormat &input_data, idx_t &entry_idx) const; - void BuildListVector(const LinkedList &linked_list, Vector &result, idx_t &initial_total_count) const; + void BuildListVector(const LinkedList &linked_list, Vector &result, idx_t total_count) const; }; void GetSegmentDataFunctions(ListSegmentFunctions &functions, const LogicalType &type); diff --git a/src/include/duckdb/function/aggregate_state.hpp b/src/include/duckdb/function/aggregate_state.hpp index 66b7338c4a5..9b0015d2d66 100644 --- a/src/include/duckdb/function/aggregate_state.hpp +++ b/src/include/duckdb/function/aggregate_state.hpp @@ -17,15 +17,19 @@ namespace duckdb { enum class AggregateType : uint8_t { NON_DISTINCT = 1, DISTINCT = 2 }; //! Whether or not the input order influences the result of the aggregate enum class AggregateOrderDependent : uint8_t { ORDER_DEPENDENT = 1, NOT_ORDER_DEPENDENT = 2 }; +//! Whether or not the combiner needs to preserve the source +enum class AggregateCombineType : uint8_t { PRESERVE_INPUT = 1, ALLOW_DESTRUCTIVE = 2 }; class BoundAggregateExpression; struct AggregateInputData { - AggregateInputData(optional_ptr bind_data_p, ArenaAllocator &allocator_p) - : bind_data(bind_data_p), allocator(allocator_p) { + AggregateInputData(optional_ptr bind_data_p, ArenaAllocator &allocator_p, + AggregateCombineType combine_type_p = AggregateCombineType::PRESERVE_INPUT) + : bind_data(bind_data_p), allocator(allocator_p), combine_type(combine_type_p) { } optional_ptr bind_data; ArenaAllocator &allocator; + AggregateCombineType combine_type; }; struct AggregateUnaryInput { diff --git a/test/sql/window/test_list_window.test b/test/sql/window/test_list_window.test index a335aeafff8..d39b8cb8184 100644 --- a/test/sql/window/test_list_window.test +++ b/test/sql/window/test_list_window.test @@ -48,3 +48,80 @@ SELECT FIRST(LIST_EXTRACT(l, 3)) FROM list_window GROUP BY g ORDER BY g; NULL NULL NULL + +statement ok +create table list_combine_test as + select range%3 j, + range::varchar AS s, + case when range%3=0 then '-' else '|' end sep + from range(1, 65) + +query III +select j, s, list(s) over (partition by j order by s) +from list_combine_test +order by j, s; +---- +0 12 [12] +0 15 [12, 15] +0 18 [12, 15, 18] +0 21 [12, 15, 18, 21] +0 24 [12, 15, 18, 21, 24] +0 27 [12, 15, 18, 21, 24, 27] +0 3 [12, 15, 18, 21, 24, 27, 3] +0 30 [12, 15, 18, 21, 24, 27, 3, 30] +0 33 [12, 15, 18, 21, 24, 27, 3, 30, 33] +0 36 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36] +0 39 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39] +0 42 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42] +0 45 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45] +0 48 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48] +0 51 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48, 51] +0 54 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48, 51, 54] +0 57 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57] +0 6 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 6] +0 60 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 6, 60] +0 63 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 6, 60, 63] +0 9 [12, 15, 18, 21, 24, 27, 3, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 6, 60, 63, 9] +1 1 [1] +1 10 [1, 10] +1 13 [1, 10, 13] +1 16 [1, 10, 13, 16] +1 19 [1, 10, 13, 16, 19] +1 22 [1, 10, 13, 16, 19, 22] +1 25 [1, 10, 13, 16, 19, 22, 25] +1 28 [1, 10, 13, 16, 19, 22, 25, 28] +1 31 [1, 10, 13, 16, 19, 22, 25, 28, 31] +1 34 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34] +1 37 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37] +1 4 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4] +1 40 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40] +1 43 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43] +1 46 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46] +1 49 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46, 49] +1 52 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46, 49, 52] +1 55 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46, 49, 52, 55] +1 58 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46, 49, 52, 55, 58] +1 61 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46, 49, 52, 55, 58, 61] +1 64 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46, 49, 52, 55, 58, 61, 64] +1 7 [1, 10, 13, 16, 19, 22, 25, 28, 31, 34, 37, 4, 40, 43, 46, 49, 52, 55, 58, 61, 64, 7] +2 11 [11] +2 14 [11, 14] +2 17 [11, 14, 17] +2 2 [11, 14, 17, 2] +2 20 [11, 14, 17, 2, 20] +2 23 [11, 14, 17, 2, 20, 23] +2 26 [11, 14, 17, 2, 20, 23, 26] +2 29 [11, 14, 17, 2, 20, 23, 26, 29] +2 32 [11, 14, 17, 2, 20, 23, 26, 29, 32] +2 35 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35] +2 38 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38] +2 41 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41] +2 44 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44] +2 47 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47] +2 5 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 5] +2 50 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 5, 50] +2 53 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 5, 50, 53] +2 56 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 5, 50, 53, 56] +2 59 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 5, 50, 53, 56, 59] +2 62 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 5, 50, 53, 56, 59, 62] +2 8 [11, 14, 17, 2, 20, 23, 26, 29, 32, 35, 38, 41, 44, 47, 5, 50, 53, 56, 59, 62, 8]