Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "4-dev4188"
#define DUCKDB_PATCH_VERSION "4-dev4200"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 1
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.1.4-dev4188"
#define DUCKDB_VERSION "v1.1.4-dev4200"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "fbc4d92fe6"
#define DUCKDB_SOURCE_ID "3b44a57608"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
127 changes: 30 additions & 97 deletions src/duckdb/src/function/window/window_aggregate_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,117 +26,50 @@ class WindowAggregateExecutorGlobalState : public WindowExecutorGlobalState {
const Expression *filter_ref;
};

bool WindowAggregateExecutor::IsConstantAggregate() {
if (!wexpr.aggregate) {
return false;
}
// window exclusion cannot be handled by constant aggregates
if (wexpr.exclude_clause != WindowExcludeMode::NO_OTHER) {
return false;
}

// COUNT(*) is already handled efficiently by segment trees.
if (wexpr.children.empty()) {
return false;
}

/*
The default framing option is RANGE UNBOUNDED PRECEDING, which
is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW; it sets the frame to be all rows from the partition start
up through the current row's last peer (a row that the window's
ORDER BY clause considers equivalent to the current row; all
rows are peers if there is no ORDER BY). In general, UNBOUNDED
PRECEDING means that the frame starts with the first row of the
partition, and similarly UNBOUNDED FOLLOWING means that the
frame ends with the last row of the partition, regardless of
RANGE, ROWS or GROUPS mode. In ROWS mode, CURRENT ROW means that
the frame starts or ends with the current row; but in RANGE or
GROUPS mode it means that the frame starts or ends with the
current row's first or last peer in the ORDER BY ordering. The
offset PRECEDING and offset FOLLOWING options vary in meaning
depending on the frame mode.
*/
switch (wexpr.start) {
case WindowBoundary::UNBOUNDED_PRECEDING:
break;
case WindowBoundary::CURRENT_ROW_RANGE:
if (!wexpr.orders.empty()) {
return false;
static BoundWindowExpression &SimplifyWindowedAggregate(BoundWindowExpression &wexpr, ClientContext &context) {
// Remove redundant/irrelevant modifiers (they can be serious performance cliffs)
if (wexpr.aggregate && ClientConfig::GetConfig(context).enable_optimizer) {
const auto &aggr = wexpr.aggregate;
auto &arg_orders = wexpr.arg_orders;
if (aggr->distinct_dependent != AggregateDistinctDependent::DISTINCT_DEPENDENT) {
wexpr.distinct = false;
}
break;
default:
return false;
}

switch (wexpr.end) {
case WindowBoundary::UNBOUNDED_FOLLOWING:
break;
case WindowBoundary::CURRENT_ROW_RANGE:
if (!wexpr.orders.empty()) {
return false;
if (aggr->order_dependent != AggregateOrderDependent::ORDER_DEPENDENT) {
arg_orders.clear();
} else {
// If the argument order is prefix of the partition ordering,
// then we can just use the partition ordering.
if (BoundWindowExpression::GetSharedOrders(wexpr.orders, arg_orders) == arg_orders.size()) {
arg_orders.clear();
}
}
break;
default:
return false;
}

return true;
}

bool WindowAggregateExecutor::IsDistinctAggregate() {
if (!wexpr.aggregate) {
return false;
}

return wexpr.distinct;
}

bool WindowAggregateExecutor::IsCustomAggregate() {
if (!wexpr.aggregate) {
return false;
}

if (!AggregateObject(wexpr).function.window) {
return false;
}

return (mode < WindowAggregationMode::COMBINE);
}

void WindowExecutor::Evaluate(idx_t row_idx, DataChunk &eval_chunk, Vector &result, WindowExecutorLocalState &lstate,
WindowExecutorGlobalState &gstate) const {
auto &lbstate = lstate.Cast<WindowExecutorBoundsState>();
lbstate.UpdateBounds(gstate, row_idx, eval_chunk, lstate.range_cursor);

const auto count = eval_chunk.size();
EvaluateInternal(gstate, lstate, eval_chunk, result, count, row_idx);

result.Verify(count);
return wexpr;
}

WindowAggregateExecutor::WindowAggregateExecutor(BoundWindowExpression &wexpr, ClientContext &context,
WindowSharedExpressions &shared, WindowAggregationMode mode)
: WindowExecutor(wexpr, context, shared), mode(mode) {
auto return_type = wexpr.return_type;
: WindowExecutor(SimplifyWindowedAggregate(wexpr, context), context, shared), mode(mode) {

// Force naive for SEPARATE mode or for (currently!) unsupported functionality
const auto force_naive =
!ClientConfig::GetConfig(context).enable_optimizer || mode == WindowAggregationMode::SEPARATE;
if (force_naive || (wexpr.distinct && wexpr.exclude_clause != WindowExcludeMode::NO_OTHER)) {
aggregator = make_uniq<WindowNaiveAggregator>(wexpr, wexpr.exclude_clause, shared);
} else if (IsDistinctAggregate()) {
if (!ClientConfig::GetConfig(context).enable_optimizer || mode == WindowAggregationMode::SEPARATE) {
aggregator = make_uniq<WindowNaiveAggregator>(*this, shared);
} else if (WindowDistinctAggregator::CanAggregate(wexpr)) {
// build a merge sort tree
// see https://dl.acm.org/doi/pdf/10.1145/3514221.3526184
aggregator = make_uniq<WindowDistinctAggregator>(wexpr, wexpr.exclude_clause, shared, context);
} else if (IsConstantAggregate()) {
aggregator = make_uniq<WindowConstantAggregator>(wexpr, wexpr.exclude_clause, shared);
} else if (IsCustomAggregate()) {
aggregator = make_uniq<WindowCustomAggregator>(wexpr, wexpr.exclude_clause, shared);
} else {
aggregator = make_uniq<WindowDistinctAggregator>(wexpr, shared, context);
} else if (WindowConstantAggregator::CanAggregate(wexpr)) {
aggregator = make_uniq<WindowConstantAggregator>(wexpr, shared);
} else if (WindowCustomAggregator::CanAggregate(wexpr, mode)) {
aggregator = make_uniq<WindowCustomAggregator>(wexpr, shared);
} else if (WindowSegmentTree::CanAggregate(wexpr)) {
// build a segment tree for frame-adhering aggregates
// see http://www.vldb.org/pvldb/vol8/p1058-leis.pdf
aggregator = make_uniq<WindowSegmentTree>(wexpr, mode, wexpr.exclude_clause, shared);
aggregator = make_uniq<WindowSegmentTree>(wexpr, shared);
} else {
// No accelerator can handle this combination, so fall back to naïve.
aggregator = make_uniq<WindowNaiveAggregator>(*this, shared);
}

// Compute the FILTER with the other eval columns.
Expand Down
9 changes: 4 additions & 5 deletions src/duckdb/src/function/window/window_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ namespace duckdb {
WindowAggregatorState::WindowAggregatorState() : allocator(Allocator::DefaultAllocator()) {
}

WindowAggregator::WindowAggregator(const BoundWindowExpression &wexpr, const WindowExcludeMode exclude_mode_p)
WindowAggregator::WindowAggregator(const BoundWindowExpression &wexpr)
: wexpr(wexpr), aggr(wexpr), result_type(wexpr.return_type), state_size(aggr.function.state_size(aggr.function)),
exclude_mode(exclude_mode_p) {
exclude_mode(wexpr.exclude_clause) {

for (auto &child : wexpr.children) {
arg_types.emplace_back(child->return_type);
}
}

WindowAggregator::WindowAggregator(const BoundWindowExpression &wexpr, const WindowExcludeMode exclude_mode_p,
WindowSharedExpressions &shared)
: WindowAggregator(wexpr, exclude_mode_p) {
WindowAggregator::WindowAggregator(const BoundWindowExpression &wexpr, WindowSharedExpressions &shared)
: WindowAggregator(wexpr) {
for (auto &child : wexpr.children) {
child_idx.emplace_back(shared.RegisterCollection(child, false));
}
Expand Down
137 changes: 105 additions & 32 deletions src/duckdb/src/function/window/window_constant_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace duckdb {

//===--------------------------------------------------------------------===//
// WindowConstantAggregator
// WindowConstantAggregatorGlobalState
//===--------------------------------------------------------------------===//

class WindowConstantAggregatorGlobalState : public WindowAggregatorGlobalState {
Expand All @@ -24,33 +24,6 @@ class WindowConstantAggregatorGlobalState : public WindowAggregatorGlobalState {
unique_ptr<Vector> results;
};

class WindowConstantAggregatorLocalState : public WindowAggregatorLocalState {
public:
explicit WindowConstantAggregatorLocalState(const WindowConstantAggregatorGlobalState &gstate);
~WindowConstantAggregatorLocalState() override {
}

void Sink(DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, optional_ptr<SelectionVector> filter_sel,
idx_t filtered);
void Combine(WindowConstantAggregatorGlobalState &gstate);

public:
//! The global state we are sharing
const WindowConstantAggregatorGlobalState &gstate;
//! Reusable chunk for sinking
DataChunk inputs;
//! Chunk for referencing the input columns
DataChunk payload_chunk;
//! A vector of pointers to "state", used for intermediate window segment aggregation
Vector statep;
//! Reused result state container for the window functions
WindowAggregateStates statef;
//! The current result partition being read
idx_t partition;
//! Shared SV for evaluation
SelectionVector matches;
};

WindowConstantAggregatorGlobalState::WindowConstantAggregatorGlobalState(ClientContext &context,
const WindowConstantAggregator &aggregator,
idx_t group_count,
Expand Down Expand Up @@ -93,6 +66,36 @@ WindowConstantAggregatorGlobalState::WindowConstantAggregatorGlobalState(ClientC
partition_offsets.emplace_back(group_count);
}

//===--------------------------------------------------------------------===//
// WindowConstantAggregatorLocalState
//===--------------------------------------------------------------------===//
class WindowConstantAggregatorLocalState : public WindowAggregatorLocalState {
public:
explicit WindowConstantAggregatorLocalState(const WindowConstantAggregatorGlobalState &gstate);
~WindowConstantAggregatorLocalState() override {
}

void Sink(DataChunk &sink_chunk, DataChunk &coll_chunk, idx_t input_idx, optional_ptr<SelectionVector> filter_sel,
idx_t filtered);
void Combine(WindowConstantAggregatorGlobalState &gstate);

public:
//! The global state we are sharing
const WindowConstantAggregatorGlobalState &gstate;
//! Reusable chunk for sinking
DataChunk inputs;
//! Chunk for referencing the input columns
DataChunk payload_chunk;
//! A vector of pointers to "state", used for intermediate window segment aggregation
Vector statep;
//! Reused result state container for the window functions
WindowAggregateStates statef;
//! The current result partition being read
idx_t partition;
//! Shared SV for evaluation
SelectionVector matches;
};

WindowConstantAggregatorLocalState::WindowConstantAggregatorLocalState(
const WindowConstantAggregatorGlobalState &gstate)
: gstate(gstate), statep(Value::POINTER(0)), statef(gstate.statef.aggr), partition(0) {
Expand All @@ -110,10 +113,80 @@ WindowConstantAggregatorLocalState::WindowConstantAggregatorLocalState(
gstate.locals++;
}

WindowConstantAggregator::WindowConstantAggregator(const BoundWindowExpression &wexpr,
const WindowExcludeMode exclude_mode_p,
WindowSharedExpressions &shared)
: WindowAggregator(wexpr, exclude_mode_p) {
//===--------------------------------------------------------------------===//
// WindowConstantAggregator
//===--------------------------------------------------------------------===//
bool WindowConstantAggregator::CanAggregate(const BoundWindowExpression &wexpr) {
if (!wexpr.aggregate) {
return false;
}
// window exclusion cannot be handled by constant aggregates
if (wexpr.exclude_clause != WindowExcludeMode::NO_OTHER) {
return false;
}

// DISTINCT aggregation cannot be handled by constant aggregation
// TODO: Use a hash table
if (wexpr.distinct) {
return false;
}

// ORDER BY arguments cannot be handled by constant aggregation
if (!wexpr.arg_orders.empty()) {
return false;
}

// COUNT(*) is already handled efficiently by segment trees.
if (wexpr.children.empty()) {
return false;
}

/*
The default framing option is RANGE UNBOUNDED PRECEDING, which
is the same as RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW; it sets the frame to be all rows from the partition start
up through the current row's last peer (a row that the window's
ORDER BY clause considers equivalent to the current row; all
rows are peers if there is no ORDER BY). In general, UNBOUNDED
PRECEDING means that the frame starts with the first row of the
partition, and similarly UNBOUNDED FOLLOWING means that the
frame ends with the last row of the partition, regardless of
RANGE, ROWS or GROUPS mode. In ROWS mode, CURRENT ROW means that
the frame starts or ends with the current row; but in RANGE or
GROUPS mode it means that the frame starts or ends with the
current row's first or last peer in the ORDER BY ordering. The
offset PRECEDING and offset FOLLOWING options vary in meaning
depending on the frame mode.
*/
switch (wexpr.start) {
case WindowBoundary::UNBOUNDED_PRECEDING:
break;
case WindowBoundary::CURRENT_ROW_RANGE:
if (!wexpr.orders.empty()) {
return false;
}
break;
default:
return false;
}

switch (wexpr.end) {
case WindowBoundary::UNBOUNDED_FOLLOWING:
break;
case WindowBoundary::CURRENT_ROW_RANGE:
if (!wexpr.orders.empty()) {
return false;
}
break;
default:
return false;
}

return true;
}

WindowConstantAggregator::WindowConstantAggregator(const BoundWindowExpression &wexpr, WindowSharedExpressions &shared)
: WindowAggregator(wexpr) {

// We only need these values for Sink
for (auto &child : wexpr.children) {
Expand Down
23 changes: 20 additions & 3 deletions src/duckdb/src/function/window/window_custom_aggregator.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,30 @@
#include "duckdb/function/window/window_custom_aggregator.hpp"
#include "duckdb/planner/expression/bound_window_expression.hpp"

namespace duckdb {

//===--------------------------------------------------------------------===//
// WindowCustomAggregator
//===--------------------------------------------------------------------===//
WindowCustomAggregator::WindowCustomAggregator(const BoundWindowExpression &wexpr, const WindowExcludeMode exclude_mode,
WindowSharedExpressions &shared)
: WindowAggregator(wexpr, exclude_mode, shared) {
bool WindowCustomAggregator::CanAggregate(const BoundWindowExpression &wexpr, WindowAggregationMode mode) {
if (!wexpr.aggregate) {
return false;
}

if (!wexpr.aggregate->window) {
return false;
}

// ORDER BY arguments are not currently supported
if (!wexpr.arg_orders.empty()) {
return false;
}

return (mode < WindowAggregationMode::COMBINE);
}

WindowCustomAggregator::WindowCustomAggregator(const BoundWindowExpression &wexpr, WindowSharedExpressions &shared)
: WindowAggregator(wexpr, shared) {
}

WindowCustomAggregator::~WindowCustomAggregator() {
Expand Down
Loading