Skip to content

Commit

Permalink
Internal duckdb#861: Aggregation Absorb API
Browse files Browse the repository at this point in the history
First refactor of segment trees for order-sensitive aggregates.
  • Loading branch information
hawkfish committed Dec 20, 2023
1 parent 3242ad6 commit 3e991b5
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions src/execution/window_segment_tree.cpp
Expand Up @@ -653,14 +653,23 @@ class WindowSegmentTreePart {
void ExtractFrame(idx_t begin, idx_t end, data_ptr_t current_state);
void WindowSegmentValue(const WindowSegmentTree &tree, idx_t l_idx, idx_t begin, idx_t end,
data_ptr_t current_state);
//! optionally writes result and calls destructors
//! Writes result and calls destructors
void Finalize(Vector &result, idx_t count);

void Combine(WindowSegmentTreePart &other, idx_t count);

void Evaluate(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, Vector &result, idx_t count,
idx_t row_idx, FramePart frame_part);

protected:
//! Initialises the accumulation state vector (statef)
void Initialize(idx_t count);
//! Accumulate upper tree levels
void EvaluateUpperLevels(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, idx_t count,
idx_t row_idx, FramePart frame_part);
void EvaluateLeaves(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends, idx_t count,
idx_t row_idx, FramePart frame_part);

public:
//! Allocator for aggregates
ArenaAllocator &allocator;
Expand Down Expand Up @@ -898,20 +907,23 @@ void WindowSegmentTree::Evaluate(WindowAggregatorState &lstate, const DataChunk
part.Finalize(result, count);
}

void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends,
Vector &result, idx_t count, idx_t row_idx, FramePart frame_part) {
void WindowSegmentTreePart::Initialize(idx_t count) {
auto fdata = FlatVector::GetData<data_ptr_t>(statef);
for (idx_t rid = 0; rid < count; ++rid) {
auto state_ptr = fdata[rid];
aggr.function.initialize(state_ptr);
}
}

const auto cant_combine = (!aggr.function.combine || !tree.UseCombineAPI());
void WindowSegmentTreePart::EvaluateUpperLevels(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends,
idx_t count, idx_t row_idx, FramePart frame_part) {
auto fdata = FlatVector::GetData<data_ptr_t>(statef);

const auto exclude_mode = tree.exclude_mode;
const bool begin_on_curr_row = frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::CURRENT_ROW;
const bool end_on_curr_row = frame_part == FramePart::LEFT && exclude_mode == WindowExcludeMode::CURRENT_ROW;
// with EXCLUDE TIES, in addition to the frame part right of the peer group's end, we also need to consider the
// current row
const bool add_curr_row = frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::TIES;
const bool can_reuse = aggr.function.order_dependent == AggregateOrderDependent::NOT_ORDER_DEPENDENT;

// First pass: aggregate the segment tree nodes
// Share adjacent identical states
// We do this first because we want to share only tree aggregations
idx_t prev_begin = 1;
Expand All @@ -921,12 +933,6 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t
data_ptr_t prev_state = nullptr;
for (idx_t rid = 0, cur_row = row_idx; rid < count; ++rid, ++cur_row) {
auto state_ptr = fdata[rid];
aggr.function.initialize(state_ptr);

if (cant_combine) {
// Make sure we initialise all states
continue;
}

auto begin = begin_on_curr_row ? cur_row + 1 : begins[rid];
auto end = end_on_curr_row ? cur_row : ends[rid];
Expand All @@ -949,7 +955,7 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t
break;
}

if (l_idx == 1) {
if (can_reuse && l_idx == 1) {
prev_state = state_ptr;
prev_begin = begin;
prev_end = end;
Expand Down Expand Up @@ -979,9 +985,19 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t
}
}
FlushStates(true);
}

void WindowSegmentTreePart::EvaluateLeaves(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends,
idx_t count, idx_t row_idx, FramePart frame_part) {
auto fdata = FlatVector::GetData<data_ptr_t>(statef);

const auto exclude_mode = tree.exclude_mode;
const bool begin_on_curr_row = frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::CURRENT_ROW;
const bool end_on_curr_row = frame_part == FramePart::LEFT && exclude_mode == WindowExcludeMode::CURRENT_ROW;
// with EXCLUDE TIES, in addition to the frame part right of the peer group's end, we also need to consider the
// current row
const bool add_curr_row = frame_part == FramePart::RIGHT && exclude_mode == WindowExcludeMode::TIES;

// Second pass: aggregate the ragged leaves
// (or everything if we can't combine)
for (idx_t rid = 0, cur_row = row_idx; rid < count; ++rid, ++cur_row) {
auto state_ptr = fdata[rid];

Expand All @@ -994,10 +1010,9 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t
continue;
}

// Aggregate everything at once if we can't combine states
idx_t parent_begin = begin / tree.TREE_FANOUT;
idx_t parent_end = end / tree.TREE_FANOUT;
if (parent_begin == parent_end || cant_combine) {
if (parent_begin == parent_end) {
WindowSegmentValue(tree, 0, begin, end, state_ptr);
continue;
}
Expand All @@ -1015,6 +1030,19 @@ void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t
FlushStates(false);
}

void WindowSegmentTreePart::Evaluate(const WindowSegmentTree &tree, const idx_t *begins, const idx_t *ends,
Vector &result, idx_t count, idx_t row_idx, FramePart frame_part) {
D_ASSERT(aggr.function.combine && tree.UseCombineAPI());

Initialize(count);

// First pass: aggregate the segment tree nodes
EvaluateUpperLevels(tree, begins, ends, count, row_idx, frame_part);

// Second pass: aggregate the ragged leaves
EvaluateLeaves(tree, begins, ends, count, row_idx, frame_part);
}

//===--------------------------------------------------------------------===//
// WindowDistinctAggregator
//===--------------------------------------------------------------------===//
Expand Down

0 comments on commit 3e991b5

Please sign in to comment.