diff --git a/src/core_functions/aggregate/holistic/mode.cpp b/src/core_functions/aggregate/holistic/mode.cpp index 0f81b88c920..5ee8a16901c 100644 --- a/src/core_functions/aggregate/holistic/mode.cpp +++ b/src/core_functions/aggregate/holistic/mode.cpp @@ -41,6 +41,7 @@ struct ModeState { }; using Counts = unordered_map; + FrameBounds prev; Counts *frequency_map; KEY_TYPE *mode; size_t nonzero; @@ -209,17 +210,18 @@ struct ModeFunction { template static void Window(const INPUT_TYPE *data, const ValidityMask &fmask, const ValidityMask &dmask, - AggregateInputData &, STATE &state, const FrameBounds &frame, const FrameBounds &prev, - Vector &result, idx_t rid, idx_t bias) { + AggregateInputData &aggr_input_data, STATE &state, const FrameBounds &frame, Vector &result, + idx_t rid, const STATE *gstate) { auto rdata = FlatVector::GetData(result); auto &rmask = FlatVector::Validity(result); - ModeIncluded included(fmask, dmask, bias); + ModeIncluded included(fmask, dmask, 0); if (!state.frequency_map) { state.frequency_map = new typename STATE::Counts; } const double tau = .25; + auto &prev = state.prev; if (state.nonzero <= tau * state.frequency_map->size() || prev.end <= frame.start || frame.end <= prev.start) { state.Reset(); // for f ∈ F do @@ -269,6 +271,8 @@ struct ModeFunction { } else { rmask.Set(rid, false); } + + prev = frame; } static bool IgnoreNull() { diff --git a/src/core_functions/aggregate/holistic/quantile.cpp b/src/core_functions/aggregate/holistic/quantile.cpp index 045ae65f1fd..ec365ed549d 100644 --- a/src/core_functions/aggregate/holistic/quantile.cpp +++ b/src/core_functions/aggregate/holistic/quantile.cpp @@ -1,5 +1,6 @@ #include "duckdb/execution/expression_executor.hpp" #include "duckdb/core_functions/aggregate/holistic_functions.hpp" +#include "duckdb/execution/merge_sort_tree.hpp" #include "duckdb/planner/expression.hpp" #include "duckdb/common/operator/cast_operators.hpp" #include "duckdb/common/operator/abs.hpp" @@ -11,6 +12,7 @@ #include "duckdb/common/serializer/deserializer.hpp" #include +#include #include #include @@ -36,33 +38,120 @@ inline interval_t operator-(const interval_t &lhs, const interval_t &rhs) { return Interval::FromMicro(Interval::GetMicro(lhs) - Interval::GetMicro(rhs)); } -template -struct QuantileState { - using SaveType = SAVE_TYPE; +struct QuantileSortTree : public MergeSortTree, 64, 64> { + explicit QuantileSortTree(Elements &&lowest_level) : MergeSortTree(std::move(lowest_level)) { + } - // Regular aggregation - vector v; + size_t SelectNth(const FrameBounds &frame, ElementType n) const; +}; - // Windowed Quantile indirection - vector w; - idx_t pos; +size_t QuantileSortTree::SelectNth(const FrameBounds &frame, ElementType n) const { + // Empty frames should be handled by the caller + D_ASSERT(frame.start < frame.end); + + // Handle special case of a one-element tree + if (tree.size() < 2) { + return 0; + } + + // The first level contains a single run, + // so the only thing we need is any cascading pointers + auto level_no = tree.size() - 2; + auto level_width = 1; + for (size_t i = 0; i < level_no; ++i) { + level_width *= FANOUT; + } + + // Find Nth element in a top-down traversal + size_t result = 0; + + // First, handle levels with cascading pointers + const auto min_cascaded = LowestCascadingLevel(); + if (level_no > min_cascaded) { + // Initialise the cascade indicies from the previous level + pair cascade_idx; + { + const auto &level = tree[level_no + 1].first; + const auto lower_idx = std::lower_bound(level.begin(), level.end(), frame.start) - level.begin(); + cascade_idx.first = lower_idx / CASCADING * FANOUT; + const auto upper_idx = std::lower_bound(level.begin(), level.end(), frame.end) - level.begin(); + cascade_idx.second = upper_idx / CASCADING * FANOUT; + } - // Windowed MAD indirection - vector m; + // Walk the cascaded levels + for (; level_no >= min_cascaded; --level_no) { + // The cascade indicies into this level are in the previous level + const auto &level_cascades = tree[level_no + 1].second; + + // Go over all children until we found enough in range + const auto *level_data = tree[level_no].first.data(); + while (true) { + const auto lower_begin = level_data + level_cascades[cascade_idx.first]; + const auto lower_end = level_data + level_cascades[cascade_idx.first + FANOUT]; + const auto lower_idx = std::lower_bound(lower_begin, lower_end, frame.start) - level_data; + + const auto upper_begin = level_data + level_cascades[cascade_idx.second]; + const auto upper_end = level_data + level_cascades[cascade_idx.second + FANOUT]; + const auto upper_idx = std::lower_bound(upper_begin, upper_end, frame.end) - level_data; + + const auto matched = upper_idx - lower_idx; + if (matched > n) { + // Enough in this level, so move down to leftmost child candidate within the cascade range + cascade_idx.first = (lower_idx / CASCADING + 2 * result) * FANOUT; + cascade_idx.second = (upper_idx / CASCADING + 2 * result) * FANOUT; + result *= FANOUT; + level_width /= FANOUT; + --level_no; + break; + } - QuantileState() : pos(0) { + // Not enough in this child, so move right + ++cascade_idx.first; + ++cascade_idx.second; + ++result; + n -= matched; + } + } } - ~QuantileState() { + // Continue with the uncascaded levels (except the first) + for (; level_no > 0; --level_no) { + const auto &level = tree[level_no].first; + auto range_begin = level.begin() + result * level_width; + auto range_end = range_begin + level_width; + while (range_end < level.end()) { + const auto lower_match = std::lower_bound(range_begin, range_end, frame.start); + const auto upper_match = std::lower_bound(lower_match, range_end, frame.end); + const auto matched = upper_match - lower_match; + if (matched > n) { + // Enough in this level, so move down to leftmost child candidate + // Since we have no cascade pointers left, this is just the start of the next level. + result *= FANOUT; + level_width /= FANOUT; + break; + } + // Not enough in this child, so move right + range_begin = range_end; + range_end += level_width; + ++result; + n -= matched; + } } - inline void SetPos(size_t pos_p) { - pos = pos_p; - if (pos >= w.size()) { - w.resize(pos); + // The last level + const auto *level_data = tree[level_no].first.data(); + ++n; + while (true) { + const auto v = level_data[result]; + n -= (v >= frame.start) && (v < frame.end); + if (!n) { + break; } + ++result; } -}; + + return result; +} struct QuantileIncluded { inline explicit QuantileIncluded(const ValidityMask &fmask_p, const ValidityMask &dmask_p, idx_t bias_p) @@ -82,6 +171,38 @@ struct QuantileIncluded { const idx_t bias; }; +template +struct QuantileState { + using SaveType = SAVE_TYPE; + + // Regular aggregation + vector v; + + // Windowed Quantile indirection + FrameBounds prev; + vector w; + idx_t pos; + + // Windowed Quantile merge sort tree + unique_ptr qst; + + // Windowed MAD indirection + vector m; + + QuantileState() : pos(0) { + } + + ~QuantileState() { + } + + inline void SetPos(size_t pos_p) { + pos = pos_p; + if (pos >= w.size()) { + w.resize(pos); + } + } +}; + void ReuseIndexes(idx_t *index, const FrameBounds &frame, const FrameBounds &prev) { idx_t j = 0; @@ -558,6 +679,38 @@ struct QuantileOperation { static bool IgnoreNull() { return true; } + + template + static void WindowInit(Vector inputs[], AggregateInputData &aggr_input_data, idx_t input_count, + const ValidityMask &filter_mask, data_ptr_t state_p, idx_t count) { + D_ASSERT(input_count == 1); + const auto data = FlatVector::GetData(inputs[0]); + const auto &data_mask = FlatVector::Validity(inputs[0]); + + // Build the indirection array + using ElementType = typename QuantileSortTree::ElementType; + vector sorted(count); + if (filter_mask.AllValid() && data_mask.AllValid()) { + std::iota(sorted.begin(), sorted.end(), 0); + } else { + size_t valid = 0; + QuantileIncluded included(filter_mask, data_mask, 0); + for (ElementType i = 0; i < count; ++i) { + if (included(i)) { + sorted[valid++] = i; + } + } + sorted.resize(valid); + } + + // Sort it + std::sort(sorted.begin(), sorted.end(), + [&](const ElementType &lhs, const ElementType &rhs) { return data[lhs] < data[rhs]; }); + + // Build the tree + auto &state = *reinterpret_cast(state_p); + state.qst = make_uniq(std::move(sorted)); + } }; template @@ -588,12 +741,12 @@ struct QuantileScalarOperation : public QuantileOperation { template static void Window(const INPUT_TYPE *data, const ValidityMask &fmask, const ValidityMask &dmask, - AggregateInputData &aggr_input_data, STATE &state, const FrameBounds &frame, - const FrameBounds &prev, Vector &result, idx_t ridx, idx_t bias) { + AggregateInputData &aggr_input_data, STATE &state, const FrameBounds &frame, Vector &result, + idx_t ridx, const STATE *gstate) { auto rdata = FlatVector::GetData(result); auto &rmask = FlatVector::Validity(result); - QuantileIncluded included(fmask, dmask, bias); + QuantileIncluded included(fmask, dmask, 0); // Lazily initialise frame state auto prev_pos = state.pos; @@ -609,6 +762,7 @@ struct QuantileScalarOperation : public QuantileOperation { const auto &q = bind_data.quantiles[0]; bool replace = false; + auto &prev = state.prev; if (frame.start == prev.start + 1 && frame.end == prev.end + 1) { // Fixed frame size const auto j = ReplaceIndex(index, frame, prev); @@ -638,6 +792,8 @@ struct QuantileScalarOperation : public QuantileOperation { } else { rmask.Set(ridx, false); } + + prev = frame; } }; @@ -647,6 +803,7 @@ AggregateFunction GetTypedDiscreteQuantileAggregateFunction(const LogicalType &t using OP = QuantileScalarOperation; auto fun = AggregateFunction::UnaryAggregateDestructor(type, type); fun.window = AggregateFunction::UnaryWindow; + fun.wininit = OP::WindowInit; return fun; } @@ -736,12 +893,12 @@ struct QuantileListOperation : public QuantileOperation { template static void Window(const INPUT_TYPE *data, const ValidityMask &fmask, const ValidityMask &dmask, - AggregateInputData &aggr_input_data, STATE &state, const FrameBounds &frame, - const FrameBounds &prev, Vector &list, idx_t lidx, idx_t bias) { + AggregateInputData &aggr_input_data, STATE &state, const FrameBounds &frame, Vector &list, + idx_t lidx, const STATE *gstate) { D_ASSERT(aggr_input_data.bind_data); auto &bind_data = aggr_input_data.bind_data->Cast(); - QuantileIncluded included(fmask, dmask, bias); + QuantileIncluded included(fmask, dmask, 0); // Result is a constant LIST with a fixed length auto ldata = FlatVector::GetData(list); @@ -760,6 +917,7 @@ struct QuantileListOperation : public QuantileOperation { state.SetPos(frame.end - frame.start); auto index = state.w.data(); + auto &prev = state.prev; // We can generalise replacement for quantile lists by observing that when a replacement is // valid for a single quantile, it is valid for all quantiles greater/less than that quantile @@ -826,6 +984,8 @@ struct QuantileListOperation : public QuantileOperation { } else { lmask.Set(lidx, false); } + + prev = frame; } }; @@ -836,6 +996,7 @@ AggregateFunction GetTypedDiscreteQuantileListAggregateFunction(const LogicalTyp auto fun = QuantileListAggregate(type, type); fun.order_dependent = AggregateOrderDependent::NOT_ORDER_DEPENDENT; fun.window = AggregateFunction::UnaryWindow; + fun.wininit = OP::template WindowInit; return fun; } @@ -893,6 +1054,7 @@ AggregateFunction GetTypedContinuousQuantileAggregateFunction(const LogicalType auto fun = AggregateFunction::UnaryAggregateDestructor(input_type, target_type); fun.order_dependent = AggregateOrderDependent::NOT_ORDER_DEPENDENT; fun.window = AggregateFunction::UnaryWindow; + fun.wininit = OP::template WindowInit; return fun; } @@ -947,6 +1109,7 @@ AggregateFunction GetTypedContinuousQuantileListAggregateFunction(const LogicalT auto fun = QuantileListAggregate(input_type, result_type); fun.order_dependent = AggregateOrderDependent::NOT_ORDER_DEPENDENT; fun.window = AggregateFunction::UnaryWindow; + fun.wininit = OP::template WindowInit; return fun; } @@ -1094,12 +1257,12 @@ struct MedianAbsoluteDeviationOperation : public QuantileOperation { template static void Window(const INPUT_TYPE *data, const ValidityMask &fmask, const ValidityMask &dmask, - AggregateInputData &aggr_input_data, STATE &state, const FrameBounds &frame, - const FrameBounds &prev, Vector &result, idx_t ridx, idx_t bias) { + AggregateInputData &aggr_input_data, STATE &state, const FrameBounds &frame, Vector &result, + idx_t ridx, const STATE *gstate) { auto rdata = FlatVector::GetData(result); auto &rmask = FlatVector::Validity(result); - QuantileIncluded included(fmask, dmask, bias); + QuantileIncluded included(fmask, dmask, 0); // Lazily initialise frame state auto prev_pos = state.pos; @@ -1119,6 +1282,7 @@ struct MedianAbsoluteDeviationOperation : public QuantileOperation { // The replacement trick does not work on the second index because if // the median has changed, the previous order is not correct. // It is probably close, however, and so reuse is helpful. + auto &prev = state.prev; ReuseIndexes(index2, frame, prev); std::partition(index2, index2 + state.pos, included); @@ -1168,6 +1332,8 @@ struct MedianAbsoluteDeviationOperation : public QuantileOperation { } else { rmask.Set(ridx, false); } + + prev = frame; } }; @@ -1185,6 +1351,7 @@ AggregateFunction GetTypedMedianAbsoluteDeviationAggregateFunction(const Logical fun.bind = BindMedian; fun.order_dependent = AggregateOrderDependent::NOT_ORDER_DEPENDENT; fun.window = AggregateFunction::UnaryWindow; + fun.wininit = OP::template WindowInit; return fun; } diff --git a/src/core_functions/aggregate/nested/list.cpp b/src/core_functions/aggregate/nested/list.cpp index a7d4d7428f7..5f75c6583da 100644 --- a/src/core_functions/aggregate/nested/list.cpp +++ b/src/core_functions/aggregate/nested/list.cpp @@ -148,8 +148,8 @@ static void ListFinalize(Vector &states_vector, AggregateInputData &aggr_input_d } static void ListWindow(Vector inputs[], const ValidityMask &filter_mask, AggregateInputData &aggr_input_data, - idx_t input_count, data_ptr_t state, const FrameBounds &frame, const FrameBounds &prev, - Vector &result, idx_t rid, idx_t bias) { + idx_t input_count, data_ptr_t state, const FrameBounds &frame, Vector &result, idx_t rid, + const_data_ptr_t) { auto &list_bind_data = aggr_input_data.bind_data->Cast(); LinkedList linked_list; diff --git a/src/execution/window_segment_tree.cpp b/src/execution/window_segment_tree.cpp index 2a66399c9e4..52263e79b9e 100644 --- a/src/execution/window_segment_tree.cpp +++ b/src/execution/window_segment_tree.cpp @@ -293,13 +293,12 @@ void WindowCustomAggregator::Evaluate(WindowAggregatorState &lstate, const idx_t } // Frame boundaries - auto prev = frame; frame = FrameBounds(begin, end); // Extract the range AggregateInputData aggr_input_data(aggr.GetFunctionData(), lstate.allocator); aggr.function.window(params, filter_mask, aggr_input_data, inputs.ColumnCount(), lcstate.state.data(), frame, - prev, result, i, 0); + result, i, nullptr); } } diff --git a/src/function/aggregate/distributive/count.cpp b/src/function/aggregate/distributive/count.cpp index ec9d705bb89..161d7b58b5b 100644 --- a/src/function/aggregate/distributive/count.cpp +++ b/src/function/aggregate/distributive/count.cpp @@ -35,8 +35,8 @@ struct CountStarFunction : public BaseCountFunction { template static void Window(Vector inputs[], const ValidityMask &filter_mask, AggregateInputData &aggr_input_data, - idx_t input_count, data_ptr_t state, const FrameBounds &frame, const FrameBounds &prev, - Vector &result, idx_t rid, idx_t bias) { + idx_t input_count, data_ptr_t state, const FrameBounds &frame, Vector &result, idx_t rid, + const_data_ptr_t) { D_ASSERT(input_count == 0); auto data = FlatVector::GetData(result); const auto begin = frame.start; diff --git a/src/function/aggregate/sorted_aggregate_function.cpp b/src/function/aggregate/sorted_aggregate_function.cpp index 6e5f2196259..b9b226a60c0 100644 --- a/src/function/aggregate/sorted_aggregate_function.cpp +++ b/src/function/aggregate/sorted_aggregate_function.cpp @@ -352,8 +352,8 @@ struct SortedAggregateFunction { } static void Window(Vector inputs[], const ValidityMask &filter_mask, AggregateInputData &aggr_input_data, - idx_t input_count, data_ptr_t state, const FrameBounds &frame, const FrameBounds &prev, - Vector &result, idx_t rid, idx_t bias) { + idx_t input_count, data_ptr_t state, const FrameBounds &frame, Vector &result, idx_t rid, + const_data_ptr_t) { throw InternalException("Sorted aggregates should not be generated for window clauses"); } diff --git a/src/include/duckdb/common/vector_operations/aggregate_executor.hpp b/src/include/duckdb/common/vector_operations/aggregate_executor.hpp index 4a80ac81942..645b544a13d 100644 --- a/src/include/duckdb/common/vector_operations/aggregate_executor.hpp +++ b/src/include/duckdb/common/vector_operations/aggregate_executor.hpp @@ -384,13 +384,15 @@ class AggregateExecutor { template static void UnaryWindow(Vector &input, const ValidityMask &ifilter, AggregateInputData &aggr_input_data, - data_ptr_t state, const FrameBounds &frame, const FrameBounds &prev, Vector &result, - idx_t rid, idx_t bias) { + data_ptr_t state_p, const FrameBounds &frame, Vector &result, idx_t ridx, + const_data_ptr_t gstate_p) { - auto idata = FlatVector::GetData(input) - bias; + auto idata = FlatVector::GetData(input); const auto &ivalid = FlatVector::Validity(input); - OP::template Window( - idata, ifilter, ivalid, aggr_input_data, *reinterpret_cast(state), frame, prev, result, rid, bias); + auto &state = *reinterpret_cast(state_p); + auto gstate = reinterpret_cast(gstate_p); + OP::template Window(idata, ifilter, ivalid, aggr_input_data, state, frame, + result, ridx, gstate); } template diff --git a/src/include/duckdb/execution/merge_sort_tree.hpp b/src/include/duckdb/execution/merge_sort_tree.hpp index 9ce82e4bd89..f7c7578b330 100644 --- a/src/include/duckdb/execution/merge_sort_tree.hpp +++ b/src/include/duckdb/execution/merge_sort_tree.hpp @@ -9,8 +9,9 @@ #pragma once #include "duckdb/common/array.hpp" -#include "duckdb/common/typedefs.hpp" +#include "duckdb/common/helper.hpp" #include "duckdb/common/pair.hpp" +#include "duckdb/common/typedefs.hpp" #include "duckdb/common/vector.hpp" namespace duckdb { @@ -48,7 +49,7 @@ struct MergeSortTree { MergeSortTree() { } - explicit MergeSortTree(Elements &&lowest_level, const CMP &cmp); + explicit MergeSortTree(Elements &&lowest_level, const CMP &cmp = CMP()); RunElement StartGames(Games &losers, const RunElements &elements, const RunElement &sentinel) { const auto elem_nodes = elements.size(); @@ -124,6 +125,19 @@ struct MergeSortTree { Tree tree; CompareElements cmp; + + static constexpr auto FANOUT = F; + static constexpr auto CASCADING = C; + + static size_t LowestCascadingLevel() { + size_t level = 0; + size_t level_width = 1; + while (level_width <= CASCADING) { + ++level; + level_width *= FANOUT; + } + return level; + } }; template @@ -131,13 +145,13 @@ MergeSortTree::MergeSortTree(Elements &&lowest_level, const CMP const auto fanout = F; const auto cascading = C; const auto count = lowest_level.size(); - tree.emplace_back({lowest_level, Offsets()}); + tree.emplace_back(Level(lowest_level, Offsets())); - constexpr RunElement SENTINEL(std::numeric_limits::max(), std::numeric_limits::max()); + const RunElement SENTINEL(std::numeric_limits::max(), std::numeric_limits::max()); // Fan in parent levels until we are at the top // Note that we don't build the top layer as that would just be all the data. - for (idx_t child_run_length = 1; child_run_length < count;) { + for (size_t child_run_length = 1; child_run_length < count;) { const auto run_length = child_run_length * fanout; const auto num_runs = (count + run_length - 1) / run_length; @@ -155,7 +169,7 @@ MergeSortTree::MergeSortTree(Elements &&lowest_level, const CMP // https://en.wikipedia.org/wiki/K-way_merge_algorithm // TODO: Because the runs are independent, they can be parallelised with parallel_for const auto &child_level = tree.back(); - for (idx_t run_idx = 0; run_idx < num_runs; ++run_idx) { + for (size_t run_idx = 0; run_idx < num_runs; ++run_idx) { // Position markers for scanning the children. using Bounds = pair; array bounds; @@ -164,7 +178,8 @@ MergeSortTree::MergeSortTree(Elements &&lowest_level, const CMP const auto child_base = run_idx * run_length; for (size_t child_run = 0; child_run < fanout; ++child_run) { const auto child_idx = child_base + child_run * child_run_length; - bounds[child_run] = {MinValue(child_idx, count), MinValue(child_idx + child_run_length, count)}; + bounds[child_run] = {MinValue(child_idx, count), + MinValue(child_idx + child_run_length, count)}; if (bounds[child_run].first != bounds[child_run].second) { players[child_run] = {child_level.first[child_idx], child_run}; } else { @@ -174,7 +189,7 @@ MergeSortTree::MergeSortTree(Elements &&lowest_level, const CMP } // Play the first round and extract the winner - RunElements games; + Games games; auto winner = StartGames(games, players, SENTINEL); while (winner != SENTINEL) { // Add fractional cascading pointers @@ -210,7 +225,7 @@ MergeSortTree::MergeSortTree(Elements &&lowest_level, const CMP } // Insert completed level and move up to the next one - tree.emplace_back(move(elements), move(cascades)); + tree.emplace_back(std::move(elements), std::move(cascades)); child_run_length = run_length; } } diff --git a/src/include/duckdb/function/aggregate_function.hpp b/src/include/duckdb/function/aggregate_function.hpp index 56b712a3117..81221b6051b 100644 --- a/src/include/duckdb/function/aggregate_function.hpp +++ b/src/include/duckdb/function/aggregate_function.hpp @@ -40,11 +40,14 @@ typedef void (*aggregate_destructor_t)(Vector &state, AggregateInputData &aggr_i typedef void (*aggregate_simple_update_t)(Vector inputs[], AggregateInputData &aggr_input_data, idx_t input_count, data_ptr_t state, idx_t count); -//! The type used for updating complex windowed aggregate functions (optional) +//! The type used for computing complex/custom windowed aggregate functions (optional) typedef void (*aggregate_window_t)(Vector inputs[], const ValidityMask &filter_mask, AggregateInputData &aggr_input_data, idx_t input_count, data_ptr_t state, - const FrameBounds &frame, const FrameBounds &prev, Vector &result, idx_t rid, - idx_t bias); + const FrameBounds &frame, Vector &result, idx_t rid, const_data_ptr_t win_state); + +//! The type used for initializing shared complex/custom windowed aggregate state (optional) +typedef void (*aggregate_wininit_t)(Vector inputs[], AggregateInputData &aggr_input_data, idx_t input_count, + const ValidityMask &filter_mask, data_ptr_t win_state, idx_t count); typedef void (*aggregate_serialize_t)(Serializer &serializer, const optional_ptr bind_data, const AggregateFunction &function); @@ -116,8 +119,10 @@ class AggregateFunction : public BaseScalarFunction { aggregate_finalize_t finalize; //! The simple aggregate update function (may be null) aggregate_simple_update_t simple_update; - //! The windowed aggregate frame update function (may be null) + //! The windowed aggregate custom function (may be null) aggregate_window_t window; + //! The windowed aggregate custom initialization function (may be null) + aggregate_wininit_t wininit; //! The bind function (may be null) bind_aggregate_function_t bind; @@ -219,11 +224,11 @@ class AggregateFunction : public BaseScalarFunction { template static void UnaryWindow(Vector inputs[], const ValidityMask &filter_mask, AggregateInputData &aggr_input_data, - idx_t input_count, data_ptr_t state, const FrameBounds &frame, const FrameBounds &prev, - Vector &result, idx_t rid, idx_t bias) { + idx_t input_count, data_ptr_t state, const FrameBounds &frame, Vector &result, idx_t ridx, + const_data_ptr_t gstate) { D_ASSERT(input_count == 1); AggregateExecutor::UnaryWindow(inputs[0], filter_mask, aggr_input_data, - state, frame, prev, result, rid, bias); + state, frame, result, ridx, gstate); } template