From 8aa68d2eb7c9eb4641e292acaabfbae22ce52c9a Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Wed, 5 Apr 2023 15:26:05 -0700 Subject: [PATCH 1/9] Issue #5920: Ordered Aggregate Sorting Unified sorting of the inputs. This results in a 50% performance improvement for the list(c ORDER BY c) benchmark. --- .../aggregate/sorted_aggregate_function.cpp | 129 +++++++++++++----- .../expression/transform_function.cpp | 2 - 2 files changed, 97 insertions(+), 34 deletions(-) diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index 6de2bd3edf2..edea70ae975 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -4,6 +4,7 @@ #include "duckdb/function/function_binder.hpp" #include "duckdb/storage/buffer_manager.hpp" #include "duckdb/planner/expression/bound_aggregate_expression.hpp" +#include "duckdb/planner/expression/bound_constant_expression.hpp" #include "duckdb/parser/expression_map.hpp" #include "duckdb/function/aggregate/distributive_functions.hpp" @@ -187,16 +188,26 @@ struct SortedAggregateState { } } - void Finalize(SortedAggregateBindData &order_bind, LocalSortState &local_sort) { + void PrefixSortBuffer(DataChunk &prefixed) { + for (column_t col_idx = 0; col_idx < sort_buffer.ColumnCount(); ++col_idx) { + prefixed.data[col_idx + 1].Reference(sort_buffer.data[col_idx]); + } + prefixed.SetCardinality(sort_buffer); + } + + idx_t Finalize(SortedAggregateBindData &order_bind, DataChunk &prefixed, LocalSortState &local_sort) { + idx_t total = 0; if (arguments) { ColumnDataScanState sort_state; ordering->InitializeScan(sort_state); ColumnDataScanState arg_state; arguments->InitializeScan(arg_state); for (sort_buffer.Reset(); ordering->Scan(sort_state, sort_buffer); sort_buffer.Reset()) { + PrefixSortBuffer(prefixed); arg_buffer.Reset(); arguments->Scan(arg_state, arg_buffer); - local_sort.SinkChunk(sort_buffer, arg_buffer); + local_sort.SinkChunk(prefixed, arg_buffer); + total += sort_buffer.size(); } ordering->Reset(); arguments->Reset(); @@ -204,14 +215,21 @@ struct SortedAggregateState { ColumnDataScanState sort_state; ordering->InitializeScan(sort_state); for (sort_buffer.Reset(); ordering->Scan(sort_state, sort_buffer); sort_buffer.Reset()) { - local_sort.SinkChunk(sort_buffer, sort_buffer); + PrefixSortBuffer(prefixed); + local_sort.SinkChunk(prefixed, sort_buffer); + total += sort_buffer.size(); } ordering->Reset(); } else if (order_bind.sorted_on_args) { - local_sort.SinkChunk(sort_buffer, sort_buffer); + PrefixSortBuffer(prefixed); + local_sort.SinkChunk(prefixed, sort_buffer); + total += sort_buffer.size(); } else { - local_sort.SinkChunk(sort_buffer, arg_buffer); + PrefixSortBuffer(prefixed); + local_sort.SinkChunk(prefixed, arg_buffer); + total += sort_buffer.size(); } + return total; } unique_ptr arguments; @@ -333,21 +351,21 @@ struct SortedAggregateFunction { } static void Finalize(Vector &states, AggregateInputData &aggr_input_data, Vector &result, idx_t count, - idx_t offset) { + const idx_t offset) { const auto order_bind = (SortedAggregateBindData *)aggr_input_data.bind_data; auto &buffer_manager = order_bind->buffer_manager; - auto &orders = order_bind->orders; RowLayout payload_layout; payload_layout.Initialize(order_bind->arg_types); DataChunk chunk; chunk.Initialize(Allocator::DefaultAllocator(), order_bind->arg_types); + DataChunk sliced; + sliced.Initialize(Allocator::DefaultAllocator(), order_bind->arg_types); // Reusable inner state vector agg_state(order_bind->function.state_size()); Vector agg_state_vec(Value::POINTER((idx_t)agg_state.data())); // State variables - const auto input_count = order_bind->function.arguments.size(); auto bind_info = order_bind->bind_info.get(); AggregateInputData aggr_bind_info(bind_info, Allocator::DefaultAllocator()); @@ -359,43 +377,90 @@ struct SortedAggregateFunction { auto finalize = order_bind->function.finalize; auto sdata = FlatVector::GetData(states); + + // First pass: Sort all the input payloads at once on (state_idx ASC, orders) + vector orders; + orders.emplace_back(BoundOrderByNode(OrderType::ASCENDING, OrderByNullType::NULLS_FIRST, + make_uniq(Value::USMALLINT(0)))); + for (const auto &order : order_bind->orders) { + orders.emplace_back(order.Copy()); + } + + GlobalSortState global_sort(buffer_manager, orders, payload_layout); + LocalSortState local_sort; + local_sort.Initialize(global_sort, global_sort.buffer_manager); + + DataChunk prefixed; + prefixed.Initialize(Allocator::DefaultAllocator(), global_sort.sort_layout.logical_types); + + vector state_counts(count, 0); for (idx_t i = 0; i < count; ++i) { - initialize(agg_state.data()); auto state = sdata[i]; + prefixed.Reset(); + prefixed.data[0].Reference(Value::USMALLINT(i)); - // Apply the sort before delegating the chunks - auto global_sort = make_uniq(buffer_manager, orders, payload_layout); - LocalSortState local_sort; - local_sort.Initialize(*global_sort, global_sort->buffer_manager); - state->Finalize(*order_bind, local_sort); - global_sort->AddLocalState(local_sort); - - if (!global_sort->sorted_blocks.empty()) { - global_sort->PrepareMergePhase(); - while (global_sort->sorted_blocks.size() > 1) { - global_sort->InitializeMergeRound(); - MergeSorter merge_sorter(*global_sort, global_sort->buffer_manager); - merge_sorter.PerformInMergeRound(); - global_sort->CompleteMergeRound(false); - } + state_counts[i] = state->Finalize(*order_bind, prefixed, local_sort); + } + global_sort.AddLocalState(local_sort); + + // Second pass: scan the sorted data until we are done and pass slices to be aggregated. + idx_t i = 0; + if (!global_sort.sorted_blocks.empty()) { + // Sort all the data + global_sort.PrepareMergePhase(); + while (global_sort.sorted_blocks.size() > 1) { + global_sort.InitializeMergeRound(); + MergeSorter merge_sorter(global_sort, global_sort.buffer_manager); + merge_sorter.PerformInMergeRound(); + global_sort.CompleteMergeRound(false); + } - PayloadScanner scanner(*global_sort); - for (;;) { - chunk.Reset(); - scanner.Scan(chunk); - if (chunk.size() == 0) { - break; + PayloadScanner scanner(global_sort); + initialize(agg_state.data()); + while (scanner.Remaining()) { + chunk.Reset(); + scanner.Scan(chunk); + idx_t consumed = 0; + + // Distribute the rows to the aggregates + while (consumed < chunk.size()) { + // Find the next aggregate that needs data + for (; !state_counts[i]; ++i) { + // Finalize a single value at the next offset + agg_state_vec.SetVectorType(states.GetVectorType()); + finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); + if (destructor) { + destructor(agg_state_vec, 1); + } + + initialize(agg_state.data()); } + const auto input_count = MinValue(state_counts[i], chunk.size() - consumed); + for (column_t col_idx = 0; col_idx < chunk.ColumnCount(); ++col_idx) { + sliced.data[col_idx].Slice(chunk.data[col_idx], consumed, consumed + input_count); + } + sliced.SetCardinality(input_count); + // These are all simple updates, so use it if available if (simple_update) { - simple_update(chunk.data.data(), aggr_bind_info, input_count, agg_state.data(), chunk.size()); + simple_update(sliced.data.data(), aggr_bind_info, 1, agg_state.data(), sliced.size()); } else { // We are only updating a constant state agg_state_vec.SetVectorType(VectorType::CONSTANT_VECTOR); - update(chunk.data.data(), aggr_bind_info, input_count, agg_state_vec, chunk.size()); + update(sliced.data.data(), aggr_bind_info, 1, agg_state_vec, sliced.size()); } + + consumed += input_count; + state_counts[i] -= input_count; } } + finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); + ++i; + } + + // Finish any trailing empty groups + for (; i < count; ++i) { + initialize(agg_state.data()); // Finalize a single value at the next offset agg_state_vec.SetVectorType(states.GetVectorType()); diff --git a/src/parser/transform/expression/transform_function.cpp b/src/parser/transform/expression/transform_function.cpp index 851c8b0df04..9355d58fa11 100644 --- a/src/parser/transform/expression/transform_function.cpp +++ b/src/parser/transform/expression/transform_function.cpp @@ -299,8 +299,6 @@ unique_ptr Transformer::TransformFuncCall(duckdb_libpgquery::P std::move(filter_expr), std::move(order_bys), root->agg_distinct, false, root->export_state); lowercase_name = "list_sort"; - order_bys.reset(); - filter_expr.reset(); children.clear(); children.emplace_back(std::move(unordered)); children.emplace_back(std::move(sense)); From f7ec75dd8779a13780b69fea367c7194f40e230e Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Thu, 6 Apr 2023 09:00:35 -0700 Subject: [PATCH 2/9] Issue #5920: Ordered Aggregate Sorting Fix memory leak on last aggregate. --- src/function/aggregate/sorted_aggregate_function.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index edea70ae975..1eb60b9a69e 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -455,6 +455,10 @@ struct SortedAggregateFunction { } } finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); + if (destructor) { + destructor(agg_state_vec, 1); + } + ++i; } From 1c31643c396563efb6f05341dd15a75c69be5cf1 Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Thu, 6 Apr 2023 12:21:46 -0700 Subject: [PATCH 3/9] Issue #5920: Ordered Aggregate Sorting Fix constant agg_state_vec on last non-empty aggregate. --- src/function/aggregate/sorted_aggregate_function.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index 1eb60b9a69e..00d72bdd643 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -454,6 +454,8 @@ struct SortedAggregateFunction { state_counts[i] -= input_count; } } + + agg_state_vec.SetVectorType(states.GetVectorType()); finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); if (destructor) { destructor(agg_state_vec, 1); From e07d18bb908643fa334fe147ea90b58ae72644b6 Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Tue, 11 Apr 2023 10:46:56 -0700 Subject: [PATCH 4/9] Issue #5920: Ordered Aggregate Sorting Update destructor calls. --- src/function/aggregate/sorted_aggregate_function.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index 1fd0f33cf63..ad0c13d74eb 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -430,7 +430,7 @@ struct SortedAggregateFunction { agg_state_vec.SetVectorType(states.GetVectorType()); finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); if (destructor) { - destructor(agg_state_vec, 1); + destructor(agg_state_vec, aggr_bind_info, 1); } initialize(agg_state.data()); @@ -458,7 +458,7 @@ struct SortedAggregateFunction { agg_state_vec.SetVectorType(states.GetVectorType()); finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); if (destructor) { - destructor(agg_state_vec, 1); + destructor(agg_state_vec, aggr_bind_info, 1); } ++i; From 5f5b2143fd35a9ae71915b977e4aed3f1c658884 Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Tue, 11 Apr 2023 12:38:06 -0700 Subject: [PATCH 5/9] Issue #5920: Ordered Aggregate Sorting Tidy fix for move/reuse. --- src/parser/transform/expression/transform_function.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/parser/transform/expression/transform_function.cpp b/src/parser/transform/expression/transform_function.cpp index 20e7a049df8..02bed178af4 100644 --- a/src/parser/transform/expression/transform_function.cpp +++ b/src/parser/transform/expression/transform_function.cpp @@ -299,6 +299,8 @@ unique_ptr Transformer::TransformFuncCall(duckdb_libpgquery::P std::move(filter_expr), std::move(order_bys), root->agg_distinct, false, root->export_state); lowercase_name = "list_sort"; + order_bys.reset(); + filter_expr.reset(); children.clear(); children.emplace_back(std::move(unordered)); children.emplace_back(std::move(sense)); From 54459684f290bfcdce5d55b47f651c4eefb3e114 Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Thu, 13 Apr 2023 12:55:10 -0700 Subject: [PATCH 6/9] Issue #5920: Ordered Aggregate Sorting * Add a sorting threshold to make sure that we limit the amount of sorted data. * Honour the external sorting flag if set. --- .../micro/aggregate/ordered_first.benchmark | 11 + .../aggregate/sorted_aggregate_function.cpp | 192 +++++++++++------- src/include/duckdb/main/client_config.hpp | 2 + src/include/duckdb/main/settings.hpp | 10 + src/main/config.cpp | 1 + src/main/settings/settings.cpp | 16 ++ 6 files changed, 155 insertions(+), 77 deletions(-) create mode 100644 benchmark/micro/aggregate/ordered_first.benchmark diff --git a/benchmark/micro/aggregate/ordered_first.benchmark b/benchmark/micro/aggregate/ordered_first.benchmark new file mode 100644 index 00000000000..99d08a20e6e --- /dev/null +++ b/benchmark/micro/aggregate/ordered_first.benchmark @@ -0,0 +1,11 @@ +# name: benchmark/micro/aggregate/grouped_distinct.benchmark +# description: FIRST(i ORDER BY i) over a bunch of integers +# group: [aggregate] + +name Ordered First (Grouped) +group aggregate + +run +SELECT i % 2048 AS grp, FIRST(i ORDER BY i DESC) AS agg +FROM range(10000000) tbl(i) +GROUP BY ALL; diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index ad0c13d74eb..d89cc79d645 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -13,7 +13,8 @@ namespace duckdb { struct SortedAggregateBindData : public FunctionData { SortedAggregateBindData(ClientContext &context, BoundAggregateExpression &expr) : buffer_manager(BufferManager::GetBufferManager(context)), function(expr.function), - bind_info(std::move(expr.bind_info)) { + bind_info(std::move(expr.bind_info)), + threshold(ClientConfig::GetConfig(context).ordered_aggregate_threshold), external(ClientConfig::GetConfig(context).force_external) { auto &children = expr.children; arg_types.reserve(children.size()); for (const auto &child : children) { @@ -33,7 +34,7 @@ struct SortedAggregateBindData : public FunctionData { SortedAggregateBindData(const SortedAggregateBindData &other) : buffer_manager(other.buffer_manager), function(other.function), arg_types(other.arg_types), - sort_types(other.sort_types), sorted_on_args(other.sorted_on_args) { + sort_types(other.sort_types), sorted_on_args(other.sorted_on_args), threshold(other.threshold), external(other.external) { if (other.bind_info) { bind_info = other.bind_info->Copy(); } @@ -77,13 +78,17 @@ struct SortedAggregateBindData : public FunctionData { vector orders; vector sort_types; bool sorted_on_args; + + //! The sort flush threshold + const idx_t threshold; + const bool external; }; struct SortedAggregateState { //! Default buffer size, optimised for small group to avoid blowing out memory. static const idx_t BUFFER_CAPACITY = 16; - SortedAggregateState() : nsel(0), offset(0) { + SortedAggregateState() : count(0), nsel(0), offset(0) { } static inline void InitializeBuffer(DataChunk &chunk, const vector &types) { @@ -99,7 +104,7 @@ struct SortedAggregateState { chunk.Initialize(Allocator::DefaultAllocator(), types); } - void Flush(SortedAggregateBindData &order_bind) { + void Flush(const SortedAggregateBindData &order_bind) { if (ordering) { return; } @@ -117,7 +122,9 @@ struct SortedAggregateState { } } - void Update(SortedAggregateBindData &order_bind, DataChunk &sort_chunk, DataChunk &arg_chunk) { + void Update(const SortedAggregateBindData &order_bind, DataChunk &sort_chunk, DataChunk &arg_chunk) { + count += sort_chunk.size(); + // Lazy instantiation of the buffer chunks InitializeBuffer(sort_buffer, order_bind.sort_types); if (!order_bind.sorted_on_args) { @@ -140,7 +147,9 @@ struct SortedAggregateState { } } - void UpdateSlice(SortedAggregateBindData &order_bind, DataChunk &sort_inputs, DataChunk &arg_inputs) { + void UpdateSlice(const SortedAggregateBindData &order_bind, DataChunk &sort_inputs, DataChunk &arg_inputs) { + count += nsel; + // Lazy instantiation of the buffer chunks InitializeBuffer(sort_buffer, order_bind.sort_types); if (!order_bind.sorted_on_args) { @@ -195,8 +204,7 @@ struct SortedAggregateState { prefixed.SetCardinality(sort_buffer); } - idx_t Finalize(SortedAggregateBindData &order_bind, DataChunk &prefixed, LocalSortState &local_sort) { - idx_t total = 0; + void Finalize(const SortedAggregateBindData &order_bind, DataChunk &prefixed, LocalSortState &local_sort) { if (arguments) { ColumnDataScanState sort_state; ordering->InitializeScan(sort_state); @@ -207,7 +215,6 @@ struct SortedAggregateState { arg_buffer.Reset(); arguments->Scan(arg_state, arg_buffer); local_sort.SinkChunk(prefixed, arg_buffer); - total += sort_buffer.size(); } ordering->Reset(); arguments->Reset(); @@ -217,21 +224,18 @@ struct SortedAggregateState { for (sort_buffer.Reset(); ordering->Scan(sort_state, sort_buffer); sort_buffer.Reset()) { PrefixSortBuffer(prefixed); local_sort.SinkChunk(prefixed, sort_buffer); - total += sort_buffer.size(); } ordering->Reset(); } else if (order_bind.sorted_on_args) { PrefixSortBuffer(prefixed); local_sort.SinkChunk(prefixed, sort_buffer); - total += sort_buffer.size(); } else { PrefixSortBuffer(prefixed); local_sort.SinkChunk(prefixed, arg_buffer); - total += sort_buffer.size(); } - return total; } + idx_t count; unique_ptr arguments; unique_ptr ordering; @@ -255,19 +259,19 @@ struct SortedAggregateFunction { state->~STATE(); } - static void ProjectInputs(Vector inputs[], SortedAggregateBindData *order_bind, idx_t input_count, idx_t count, - DataChunk &arg_chunk, DataChunk &sort_chunk) { + static void ProjectInputs(Vector inputs[], const SortedAggregateBindData &order_bind, idx_t input_count, + idx_t count, DataChunk &arg_chunk, DataChunk &sort_chunk) { idx_t col = 0; - if (!order_bind->sorted_on_args) { - arg_chunk.InitializeEmpty(order_bind->arg_types); + if (!order_bind.sorted_on_args) { + arg_chunk.InitializeEmpty(order_bind.arg_types); for (auto &dst : arg_chunk.data) { dst.Reference(inputs[col++]); } arg_chunk.SetCardinality(count); } - sort_chunk.InitializeEmpty(order_bind->sort_types); + sort_chunk.InitializeEmpty(order_bind.sort_types); for (auto &dst : sort_chunk.data) { dst.Reference(inputs[col++]); } @@ -276,13 +280,13 @@ struct SortedAggregateFunction { static void SimpleUpdate(Vector inputs[], AggregateInputData &aggr_input_data, idx_t input_count, data_ptr_t state, idx_t count) { - const auto order_bind = (SortedAggregateBindData *)aggr_input_data.bind_data; + const auto order_bind = aggr_input_data.bind_data->Cast(); DataChunk arg_chunk; DataChunk sort_chunk; ProjectInputs(inputs, order_bind, input_count, count, arg_chunk, sort_chunk); const auto order_state = (SortedAggregateState *)state; - order_state->Update(*order_bind, sort_chunk, arg_chunk); + order_state->Update(order_bind, sort_chunk, arg_chunk); } static void ScatterUpdate(Vector inputs[], AggregateInputData &aggr_input_data, idx_t input_count, Vector &states, @@ -292,7 +296,7 @@ struct SortedAggregateFunction { } // Append the arguments to the two sub-collections - const auto order_bind = (SortedAggregateBindData *)aggr_input_data.bind_data; + const auto &order_bind = aggr_input_data.bind_data->Cast(); DataChunk arg_inputs; DataChunk sort_inputs; ProjectInputs(inputs, order_bind, input_count, count, arg_inputs, sort_inputs); @@ -333,7 +337,7 @@ struct SortedAggregateFunction { continue; } - order_state->UpdateSlice(*order_bind, sort_inputs, arg_inputs); + order_state->UpdateSlice(order_bind, sort_inputs, arg_inputs); } } @@ -352,90 +356,118 @@ struct SortedAggregateFunction { static void Finalize(Vector &states, AggregateInputData &aggr_input_data, Vector &result, idx_t count, const idx_t offset) { - const auto order_bind = (SortedAggregateBindData *)aggr_input_data.bind_data; - auto &buffer_manager = order_bind->buffer_manager; + const auto &order_bind = aggr_input_data.bind_data->Cast(); + auto &buffer_manager = order_bind.buffer_manager; RowLayout payload_layout; - payload_layout.Initialize(order_bind->arg_types); + payload_layout.Initialize(order_bind.arg_types); DataChunk chunk; - chunk.Initialize(Allocator::DefaultAllocator(), order_bind->arg_types); + chunk.Initialize(Allocator::DefaultAllocator(), order_bind.arg_types); DataChunk sliced; - sliced.Initialize(Allocator::DefaultAllocator(), order_bind->arg_types); + sliced.Initialize(Allocator::DefaultAllocator(), order_bind.arg_types); // Reusable inner state - vector agg_state(order_bind->function.state_size()); + vector agg_state(order_bind.function.state_size()); Vector agg_state_vec(Value::POINTER((idx_t)agg_state.data())); // State variables - auto bind_info = order_bind->bind_info.get(); + auto bind_info = order_bind.bind_info.get(); AggregateInputData aggr_bind_info(bind_info, Allocator::DefaultAllocator()); // Inner aggregate APIs - auto initialize = order_bind->function.initialize; - auto destructor = order_bind->function.destructor; - auto simple_update = order_bind->function.simple_update; - auto update = order_bind->function.update; - auto finalize = order_bind->function.finalize; + auto initialize = order_bind.function.initialize; + auto destructor = order_bind.function.destructor; + auto simple_update = order_bind.function.simple_update; + auto update = order_bind.function.update; + auto finalize = order_bind.function.finalize; auto sdata = FlatVector::GetData(states); - // First pass: Sort all the input payloads at once on (state_idx ASC, orders) + vector state_unprocessed(count, 0); + for (idx_t i = 0; i < count; ++i) { + state_unprocessed[i] = sdata[i]->count; + } + + // Sort the input payloads on (state_idx ASC, orders) vector orders; orders.emplace_back(BoundOrderByNode(OrderType::ASCENDING, OrderByNullType::NULLS_FIRST, make_uniq(Value::USMALLINT(0)))); - for (const auto &order : order_bind->orders) { + for (const auto &order : order_bind.orders) { orders.emplace_back(order.Copy()); } - GlobalSortState global_sort(buffer_manager, orders, payload_layout); - LocalSortState local_sort; - local_sort.Initialize(global_sort, global_sort.buffer_manager); + auto global_sort = make_uniq(buffer_manager, orders, payload_layout); + global_sort->external = order_bind.external; + auto local_sort = make_uniq(); + local_sort->Initialize(*global_sort, global_sort->buffer_manager); DataChunk prefixed; - prefixed.Initialize(Allocator::DefaultAllocator(), global_sort.sort_layout.logical_types); + prefixed.Initialize(Allocator::DefaultAllocator(), global_sort->sort_layout.logical_types); + + // Go through the states accumulating values to sort until we hit the sort threshold + idx_t unsorted_count = 0; + idx_t sorted = 0; + for (idx_t finalized = 0; finalized < count; ++finalized) { + auto state = sdata[finalized]; + if (unsorted_count < order_bind.threshold) { + prefixed.Reset(); + prefixed.data[0].Reference(Value::USMALLINT(finalized)); + state->Finalize(order_bind, prefixed, *local_sort); + unsorted_count += state_unprocessed[finalized]; + + // Go to the next aggregate unless this is the last one + if (finalized + 1 < count) { + continue; + } + } - vector state_counts(count, 0); - for (idx_t i = 0; i < count; ++i) { - auto state = sdata[i]; - prefixed.Reset(); - prefixed.data[0].Reference(Value::USMALLINT(i)); + // If they were all empty (filtering) flush them + if (!unsorted_count) { + // This can only happen on the last range, so get them all + for (; sorted < count; ++sorted) { + initialize(agg_state.data()); - state_counts[i] = state->Finalize(*order_bind, prefixed, local_sort); - } - global_sort.AddLocalState(local_sort); + // Finalize a single value at the next offset + agg_state_vec.SetVectorType(states.GetVectorType()); + finalize(agg_state_vec, aggr_bind_info, result, 1, sorted + offset); + + if (destructor) { + destructor(agg_state_vec, aggr_bind_info, 1); + } + } + break; + } - // Second pass: scan the sorted data until we are done and pass slices to be aggregated. - idx_t i = 0; - if (!global_sort.sorted_blocks.empty()) { // Sort all the data - global_sort.PrepareMergePhase(); - while (global_sort.sorted_blocks.size() > 1) { - global_sort.InitializeMergeRound(); - MergeSorter merge_sorter(global_sort, global_sort.buffer_manager); + global_sort->AddLocalState(*local_sort); + global_sort->PrepareMergePhase(); + while (global_sort->sorted_blocks.size() > 1) { + global_sort->InitializeMergeRound(); + MergeSorter merge_sorter(*global_sort, global_sort->buffer_manager); merge_sorter.PerformInMergeRound(); - global_sort.CompleteMergeRound(false); + global_sort->CompleteMergeRound(false); } - PayloadScanner scanner(global_sort); + auto scanner = make_uniq(*global_sort); initialize(agg_state.data()); - while (scanner.Remaining()) { + while (scanner->Remaining()) { chunk.Reset(); - scanner.Scan(chunk); + scanner->Scan(chunk); idx_t consumed = 0; - // Distribute the rows to the aggregates + // Distribute the scanned chunk to the aggregates while (consumed < chunk.size()) { // Find the next aggregate that needs data - for (; !state_counts[i]; ++i) { + for (; !state_unprocessed[sorted]; ++sorted) { // Finalize a single value at the next offset agg_state_vec.SetVectorType(states.GetVectorType()); - finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); + finalize(agg_state_vec, aggr_bind_info, result, 1, sorted + offset); if (destructor) { destructor(agg_state_vec, aggr_bind_info, 1); } initialize(agg_state.data()); } - const auto input_count = MinValue(state_counts[i], chunk.size() - consumed); + const auto input_count = MinValue(state_unprocessed[sorted], chunk.size() - consumed); for (column_t col_idx = 0; col_idx < chunk.ColumnCount(); ++col_idx) { sliced.data[col_idx].Slice(chunk.data[col_idx], consumed, consumed + input_count); } @@ -451,30 +483,36 @@ struct SortedAggregateFunction { } consumed += input_count; - state_counts[i] -= input_count; + state_unprocessed[sorted] -= input_count; } } + // Finalize the last state for this sort agg_state_vec.SetVectorType(states.GetVectorType()); - finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); + finalize(agg_state_vec, aggr_bind_info, result, 1, sorted + offset); if (destructor) { destructor(agg_state_vec, aggr_bind_info, 1); } + ++sorted; - ++i; - } - - // Finish any trailing empty groups - for (; i < count; ++i) { - initialize(agg_state.data()); + // Stop if we are done + if (finalized + 1 >= count) { + break; + } - // Finalize a single value at the next offset - agg_state_vec.SetVectorType(states.GetVectorType()); - finalize(agg_state_vec, aggr_bind_info, result, 1, i + offset); + // Create a new sort + scanner.reset(); + global_sort = make_uniq(buffer_manager, orders, payload_layout); + global_sort->external = order_bind.external; + local_sort = make_uniq(); + local_sort->Initialize(*global_sort, global_sort->buffer_manager); + unsorted_count = 0; - if (destructor) { - destructor(agg_state_vec, aggr_bind_info, 1); - } + // Add the current values to it + prefixed.Reset(); + prefixed.data[0].Reference(Value::USMALLINT(finalized)); + state->Finalize(order_bind, prefixed, *local_sort); + unsorted_count += state_unprocessed[finalized]; } } diff --git a/src/include/duckdb/main/client_config.hpp b/src/include/duckdb/main/client_config.hpp index 93826973692..9efd8d816a8 100644 --- a/src/include/duckdb/main/client_config.hpp +++ b/src/include/duckdb/main/client_config.hpp @@ -77,6 +77,8 @@ struct ClientConfig { //! Maximum bits allowed for using a perfect hash table (i.e. the perfect HT can hold up to 2^perfect_ht_threshold //! elements) idx_t perfect_ht_threshold = 12; + //! The maximum expression depth limit in the parser + idx_t ordered_aggregate_threshold = (idx_t(1) << 18); //! Callback to create a progress bar display progress_bar_display_create_func_t display_create_func = nullptr; diff --git a/src/include/duckdb/main/settings.hpp b/src/include/duckdb/main/settings.hpp index b245cd59353..7164bfe9e17 100644 --- a/src/include/duckdb/main/settings.hpp +++ b/src/include/duckdb/main/settings.hpp @@ -65,6 +65,16 @@ struct DebugForceNoCrossProduct { static Value GetSetting(ClientContext &context); }; +struct DebugOrderedAggregateThreshold { + static constexpr const char *Name = "debug_ordered_aggregate_threshold"; + static constexpr const char *Description = + "DEBUG SETTING: the number of rows to accumulate before sorting, used for tuning"; + static constexpr const LogicalTypeId InputType = LogicalTypeId::UBIGINT; + static void SetLocal(ClientContext &context, const Value ¶meter); + static void ResetLocal(ClientContext &context); + static Value GetSetting(ClientContext &context); +}; + struct DebugWindowMode { static constexpr const char *Name = "debug_window_mode"; static constexpr const char *Description = "DEBUG SETTING: switch window mode to use"; diff --git a/src/main/config.cpp b/src/main/config.cpp index 302552fe10c..f34ae75b19b 100644 --- a/src/main/config.cpp +++ b/src/main/config.cpp @@ -54,6 +54,7 @@ static ConfigurationOption internal_options[] = {DUCKDB_GLOBAL(AccessModeSetting DUCKDB_GLOBAL(DebugCheckpointAbort), DUCKDB_LOCAL(DebugForceExternal), DUCKDB_LOCAL(DebugForceNoCrossProduct), + DUCKDB_LOCAL(DebugOrderedAggregateThreshold), DUCKDB_GLOBAL(DebugWindowMode), DUCKDB_GLOBAL_LOCAL(DefaultCollationSetting), DUCKDB_GLOBAL(DefaultOrderSetting), diff --git a/src/main/settings/settings.cpp b/src/main/settings/settings.cpp index 6efeb80f5d6..af11e7d5ed7 100644 --- a/src/main/settings/settings.cpp +++ b/src/main/settings/settings.cpp @@ -141,6 +141,22 @@ Value DebugForceNoCrossProduct::GetSetting(ClientContext &context) { return Value::BOOLEAN(ClientConfig::GetConfig(context).force_no_cross_product); } +//===--------------------------------------------------------------------===// +// Debug Ordered Aggregate Threshold +//===--------------------------------------------------------------------===// + +void DebugOrderedAggregateThreshold::ResetLocal(ClientContext &context) { + ClientConfig::GetConfig(context).ordered_aggregate_threshold = ClientConfig().ordered_aggregate_threshold; +} + +void DebugOrderedAggregateThreshold::SetLocal(ClientContext &context, const Value &input) { + ClientConfig::GetConfig(context).ordered_aggregate_threshold = input.GetValue(); +} + +Value DebugOrderedAggregateThreshold::GetSetting(ClientContext &context) { + return Value::UBIGINT(ClientConfig::GetConfig(context).ordered_aggregate_threshold); +} + //===--------------------------------------------------------------------===// // Debug Window Mode //===--------------------------------------------------------------------===// From b6c2e65846e700207adcf319355607ef05fccbbb Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Thu, 13 Apr 2023 15:25:31 -0700 Subject: [PATCH 7/9] Issue #5920: Ordered Aggregate Sorting * Fix count transfer in Combine * Pull trailing empty aggregates out of the loop * Fix pragma list test * Add value sanity check to benchmark. * Fix tidy formatting --- .../micro/aggregate/ordered_first.benchmark | 21 +++++++++-- .../aggregate/sorted_aggregate_function.cpp | 36 +++++++++++-------- test/api/test_reset.cpp | 1 + 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/benchmark/micro/aggregate/ordered_first.benchmark b/benchmark/micro/aggregate/ordered_first.benchmark index 99d08a20e6e..64a464d738c 100644 --- a/benchmark/micro/aggregate/ordered_first.benchmark +++ b/benchmark/micro/aggregate/ordered_first.benchmark @@ -1,11 +1,28 @@ -# name: benchmark/micro/aggregate/grouped_distinct.benchmark +# name: benchmark/micro/aggregate/ordered_first.benchmark # description: FIRST(i ORDER BY i) over a bunch of integers # group: [aggregate] name Ordered First (Grouped) group aggregate +#load +#PRAGMA debug_ordered_aggregate_threshold=262144 + run SELECT i % 2048 AS grp, FIRST(i ORDER BY i DESC) AS agg FROM range(10000000) tbl(i) -GROUP BY ALL; +GROUP BY ALL +ORDER BY ALL +LIMIT 10 + +result II +0 9998336 +1 9998337 +2 9998338 +3 9998339 +4 9998340 +5 9998341 +6 9998342 +7 9998343 +8 9998344 +9 9998345 diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index d89cc79d645..cc2c111e6fa 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -13,8 +13,8 @@ namespace duckdb { struct SortedAggregateBindData : public FunctionData { SortedAggregateBindData(ClientContext &context, BoundAggregateExpression &expr) : buffer_manager(BufferManager::GetBufferManager(context)), function(expr.function), - bind_info(std::move(expr.bind_info)), - threshold(ClientConfig::GetConfig(context).ordered_aggregate_threshold), external(ClientConfig::GetConfig(context).force_external) { + bind_info(std::move(expr.bind_info)), threshold(ClientConfig::GetConfig(context).ordered_aggregate_threshold), + external(ClientConfig::GetConfig(context).force_external) { auto &children = expr.children; arg_types.reserve(children.size()); for (const auto &child : children) { @@ -34,7 +34,8 @@ struct SortedAggregateBindData : public FunctionData { SortedAggregateBindData(const SortedAggregateBindData &other) : buffer_manager(other.buffer_manager), function(other.function), arg_types(other.arg_types), - sort_types(other.sort_types), sorted_on_args(other.sorted_on_args), threshold(other.threshold), external(other.external) { + sort_types(other.sort_types), sorted_on_args(other.sorted_on_args), threshold(other.threshold), + external(other.external) { if (other.bind_info) { bind_info = other.bind_info->Copy(); } @@ -188,10 +189,12 @@ struct SortedAggregateState { Flush(order_bind); ordering->Combine(*other.ordering); arguments->Combine(*other.arguments); + count += other.count; } else if (other.ordering) { // Force CDC if the other has it Flush(order_bind); ordering->Combine(*other.ordering); + count += other.count; } else if (other.sort_buffer.size()) { Update(order_bind, other.sort_buffer, other.arg_buffer); } @@ -421,19 +424,8 @@ struct SortedAggregateFunction { } // If they were all empty (filtering) flush them + // (This can only happen on the last range) if (!unsorted_count) { - // This can only happen on the last range, so get them all - for (; sorted < count; ++sorted) { - initialize(agg_state.data()); - - // Finalize a single value at the next offset - agg_state_vec.SetVectorType(states.GetVectorType()); - finalize(agg_state_vec, aggr_bind_info, result, 1, sorted + offset); - - if (destructor) { - destructor(agg_state_vec, aggr_bind_info, 1); - } - } break; } @@ -514,6 +506,20 @@ struct SortedAggregateFunction { state->Finalize(order_bind, prefixed, *local_sort); unsorted_count += state_unprocessed[finalized]; } + + for (; sorted < count; ++sorted) { + initialize(agg_state.data()); + + // Finalize a single value at the next offset + agg_state_vec.SetVectorType(states.GetVectorType()); + finalize(agg_state_vec, aggr_bind_info, result, 1, sorted + offset); + + if (destructor) { + destructor(agg_state_vec, aggr_bind_info, 1); + } + } + + result.Verify(count); } static void Serialize(FieldWriter &writer, const FunctionData *bind_data, const AggregateFunction &function) { diff --git a/test/api/test_reset.cpp b/test/api/test_reset.cpp index 2e7e0e41e44..daf8312f116 100644 --- a/test/api/test_reset.cpp +++ b/test/api/test_reset.cpp @@ -83,6 +83,7 @@ bool OptionIsExcludedFromTest(const string &name) { "search_path", "debug_force_external", "debug_force_no_cross_product", + "debug_ordered_aggregate_threshold", "debug_window_mode", "enable_external_access", // cant change this while db is running "allow_unsigned_extensions", // cant change this while db is running From fbfa892772845faf6e1ad8d7427c17255e876e22 Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Thu, 13 Apr 2023 20:21:08 -0700 Subject: [PATCH 8/9] Issue #5920: Ordered Aggregate Sorting Fix skipping of last solo aggregate. --- .../aggregate/sorted_aggregate_function.cpp | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index cc2c111e6fa..39a1f1709d9 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -409,16 +409,16 @@ struct SortedAggregateFunction { // Go through the states accumulating values to sort until we hit the sort threshold idx_t unsorted_count = 0; idx_t sorted = 0; - for (idx_t finalized = 0; finalized < count; ++finalized) { - auto state = sdata[finalized]; + for (idx_t finalized = 0; finalized < count;) { if (unsorted_count < order_bind.threshold) { + auto state = sdata[finalized]; prefixed.Reset(); prefixed.data[0].Reference(Value::USMALLINT(finalized)); state->Finalize(order_bind, prefixed, *local_sort); unsorted_count += state_unprocessed[finalized]; // Go to the next aggregate unless this is the last one - if (finalized + 1 < count) { + if (++finalized < count) { continue; } } @@ -488,7 +488,7 @@ struct SortedAggregateFunction { ++sorted; // Stop if we are done - if (finalized + 1 >= count) { + if (finalized >= count) { break; } @@ -499,12 +499,6 @@ struct SortedAggregateFunction { local_sort = make_uniq(); local_sort->Initialize(*global_sort, global_sort->buffer_manager); unsorted_count = 0; - - // Add the current values to it - prefixed.Reset(); - prefixed.data[0].Reference(Value::USMALLINT(finalized)); - state->Finalize(order_bind, prefixed, *local_sort); - unsorted_count += state_unprocessed[finalized]; } for (; sorted < count; ++sorted) { From 0e6506828d08ed71f949f54059e44daea8134b1a Mon Sep 17 00:00:00 2001 From: Richard Wesley Date: Fri, 14 Apr 2023 11:02:45 -0700 Subject: [PATCH 9/9] Issue #5920: Ordered Aggregate Sorting * Convert config to non-debug * Switch benchmark to division and a checksum --- .../micro/aggregate/ordered_first.benchmark | 25 ++++++------------- src/include/duckdb/main/client_config.hpp | 2 +- src/include/duckdb/main/settings.hpp | 7 +++--- src/main/config.cpp | 2 +- src/main/settings/settings.cpp | 12 ++++++--- test/api/test_reset.cpp | 2 +- 6 files changed, 22 insertions(+), 28 deletions(-) diff --git a/benchmark/micro/aggregate/ordered_first.benchmark b/benchmark/micro/aggregate/ordered_first.benchmark index 64a464d738c..7f28f6c521b 100644 --- a/benchmark/micro/aggregate/ordered_first.benchmark +++ b/benchmark/micro/aggregate/ordered_first.benchmark @@ -6,23 +6,14 @@ name Ordered First (Grouped) group aggregate #load -#PRAGMA debug_ordered_aggregate_threshold=262144 +#PRAGMA ordered_aggregate_threshold=262144 run -SELECT i % 2048 AS grp, FIRST(i ORDER BY i DESC) AS agg -FROM range(10000000) tbl(i) -GROUP BY ALL -ORDER BY ALL -LIMIT 10 +SELECT SUM(agg) FROM ( + SELECT i / 2048 AS grp, FIRST(i ORDER BY i DESC) AS agg + FROM range(10000000) tbl(i) + GROUP BY ALL +) -result II -0 9998336 -1 9998337 -2 9998338 -3 9998339 -4 9998340 -5 9998341 -6 9998342 -7 9998343 -8 9998344 -9 9998345 +result I +24420932461 diff --git a/src/include/duckdb/main/client_config.hpp b/src/include/duckdb/main/client_config.hpp index 9efd8d816a8..c46518d7732 100644 --- a/src/include/duckdb/main/client_config.hpp +++ b/src/include/duckdb/main/client_config.hpp @@ -77,7 +77,7 @@ struct ClientConfig { //! Maximum bits allowed for using a perfect hash table (i.e. the perfect HT can hold up to 2^perfect_ht_threshold //! elements) idx_t perfect_ht_threshold = 12; - //! The maximum expression depth limit in the parser + //! The maximum number of rows to accumulate before sorting ordered aggregates. idx_t ordered_aggregate_threshold = (idx_t(1) << 18); //! Callback to create a progress bar display diff --git a/src/include/duckdb/main/settings.hpp b/src/include/duckdb/main/settings.hpp index 7164bfe9e17..60e2b92f64f 100644 --- a/src/include/duckdb/main/settings.hpp +++ b/src/include/duckdb/main/settings.hpp @@ -65,10 +65,9 @@ struct DebugForceNoCrossProduct { static Value GetSetting(ClientContext &context); }; -struct DebugOrderedAggregateThreshold { - static constexpr const char *Name = "debug_ordered_aggregate_threshold"; - static constexpr const char *Description = - "DEBUG SETTING: the number of rows to accumulate before sorting, used for tuning"; +struct OrderedAggregateThreshold { + static constexpr const char *Name = "ordered_aggregate_threshold"; + static constexpr const char *Description = "the number of rows to accumulate before sorting, used for tuning"; static constexpr const LogicalTypeId InputType = LogicalTypeId::UBIGINT; static void SetLocal(ClientContext &context, const Value ¶meter); static void ResetLocal(ClientContext &context); diff --git a/src/main/config.cpp b/src/main/config.cpp index f34ae75b19b..9d4c592668c 100644 --- a/src/main/config.cpp +++ b/src/main/config.cpp @@ -54,7 +54,6 @@ static ConfigurationOption internal_options[] = {DUCKDB_GLOBAL(AccessModeSetting DUCKDB_GLOBAL(DebugCheckpointAbort), DUCKDB_LOCAL(DebugForceExternal), DUCKDB_LOCAL(DebugForceNoCrossProduct), - DUCKDB_LOCAL(DebugOrderedAggregateThreshold), DUCKDB_GLOBAL(DebugWindowMode), DUCKDB_GLOBAL_LOCAL(DefaultCollationSetting), DUCKDB_GLOBAL(DefaultOrderSetting), @@ -83,6 +82,7 @@ static ConfigurationOption internal_options[] = {DUCKDB_GLOBAL(AccessModeSetting DUCKDB_GLOBAL(MaximumMemorySetting), DUCKDB_GLOBAL_ALIAS("memory_limit", MaximumMemorySetting), DUCKDB_GLOBAL_ALIAS("null_order", DefaultNullOrderSetting), + DUCKDB_LOCAL(OrderedAggregateThreshold), DUCKDB_GLOBAL(PasswordSetting), DUCKDB_LOCAL(PerfectHashThresholdSetting), DUCKDB_LOCAL(PivotLimitSetting), diff --git a/src/main/settings/settings.cpp b/src/main/settings/settings.cpp index af11e7d5ed7..26710333eae 100644 --- a/src/main/settings/settings.cpp +++ b/src/main/settings/settings.cpp @@ -145,15 +145,19 @@ Value DebugForceNoCrossProduct::GetSetting(ClientContext &context) { // Debug Ordered Aggregate Threshold //===--------------------------------------------------------------------===// -void DebugOrderedAggregateThreshold::ResetLocal(ClientContext &context) { +void OrderedAggregateThreshold::ResetLocal(ClientContext &context) { ClientConfig::GetConfig(context).ordered_aggregate_threshold = ClientConfig().ordered_aggregate_threshold; } -void DebugOrderedAggregateThreshold::SetLocal(ClientContext &context, const Value &input) { - ClientConfig::GetConfig(context).ordered_aggregate_threshold = input.GetValue(); +void OrderedAggregateThreshold::SetLocal(ClientContext &context, const Value &input) { + const auto param = input.GetValue(); + if (!param) { + throw ParserException("Invalid option for PRAGMA ordered_aggregate_threshold, value must be positive"); + } + ClientConfig::GetConfig(context).ordered_aggregate_threshold = param; } -Value DebugOrderedAggregateThreshold::GetSetting(ClientContext &context) { +Value OrderedAggregateThreshold::GetSetting(ClientContext &context) { return Value::UBIGINT(ClientConfig::GetConfig(context).ordered_aggregate_threshold); } diff --git a/test/api/test_reset.cpp b/test/api/test_reset.cpp index daf8312f116..3e81866ddce 100644 --- a/test/api/test_reset.cpp +++ b/test/api/test_reset.cpp @@ -54,6 +54,7 @@ OptionValuePair &GetValueForOption(const string &name) { {"max_expression_depth", {50}}, {"max_memory", {"4.2GB"}}, {"memory_limit", {"4.2GB"}}, + {"ordered_aggregate_threshold", {Value::UBIGINT(idx_t(1) << 12)}}, {"null_order", {"nulls_last"}}, {"perfect_ht_threshold", {0}}, {"pivot_limit", {999}}, @@ -83,7 +84,6 @@ bool OptionIsExcludedFromTest(const string &name) { "search_path", "debug_force_external", "debug_force_no_cross_product", - "debug_ordered_aggregate_threshold", "debug_window_mode", "enable_external_access", // cant change this while db is running "allow_unsigned_extensions", // cant change this while db is running