Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #9950: Ordered Aggregate Performance #10045

Merged
merged 15 commits into from Jan 8, 2024
Merged
6 changes: 3 additions & 3 deletions benchmark/micro/aggregate/ordered_first.benchmark
Expand Up @@ -5,13 +5,13 @@
name Ordered First (Grouped)
group aggregate

#load
#PRAGMA ordered_aggregate_threshold=262144
load
CREATE TABLE t AS FROM range(10000000) tbl(i);

run
SELECT SUM(agg) FROM (
SELECT i // 2048 AS grp, FIRST(i ORDER BY i DESC) AS agg
FROM range(10000000) tbl(i)
FROM t
GROUP BY ALL
)

Expand Down
5 changes: 1 addition & 4 deletions src/common/types/list_segment.cpp
Expand Up @@ -339,10 +339,7 @@ static void WriteDataToArraySegment(const ListSegmentFunctions &functions, Arena
auto valid = input_data.unified.validity.RowIsValid(sel_entry_idx);
null_mask[segment->count] = !valid;

if (!valid) {
return;
}

// Arrays require there to be values in the child even when the entry is NULL.
auto array_size = ArrayType::GetSize(input_data.logical_type);
auto array_offset = sel_entry_idx * array_size;

Expand Down
29 changes: 20 additions & 9 deletions src/execution/operator/aggregate/physical_ungrouped_aggregate.cpp
Expand Up @@ -85,29 +85,41 @@ struct AggregateState {
class UngroupedAggregateGlobalSinkState : public GlobalSinkState {
public:
UngroupedAggregateGlobalSinkState(const PhysicalUngroupedAggregate &op, ClientContext &client)
: state(op.aggregates), finished(false), allocator(BufferAllocator::Get(client)) {
: state(op.aggregates), finished(false), client_allocator(BufferAllocator::Get(client)),
allocator(client_allocator) {
if (op.distinct_data) {
distinct_state = make_uniq<DistinctAggregateState>(*op.distinct_data, client);
}
}

//! Create an ArenaAllocator with cross-thread lifetime
ArenaAllocator &CreateAllocator() const {
lock_guard<mutex> glock(lock);
stored_allocators.emplace_back(make_uniq<ArenaAllocator>(client_allocator));
return *stored_allocators.back();
}

//! The lock for updating the global aggregate state
mutex lock;
mutable mutex lock;
//! The global aggregate state
AggregateState state;
//! Whether or not the aggregate is finished
bool finished;
//! The data related to the distinct aggregates (if there are any)
unique_ptr<DistinctAggregateState> distinct_state;
//! Client base allocator
Allocator &client_allocator;
//! Global arena allocator
ArenaAllocator allocator;
//! Allocator pool
mutable vector<unique_ptr<ArenaAllocator>> stored_allocators;
};

class UngroupedAggregateLocalSinkState : public LocalSinkState {
public:
UngroupedAggregateLocalSinkState(const PhysicalUngroupedAggregate &op, const vector<LogicalType> &child_types,
GlobalSinkState &gstate_p, ExecutionContext &context)
: allocator(BufferAllocator::Get(context.client)), state(op.aggregates), child_executor(context.client),
UngroupedAggregateGlobalSinkState &gstate_p, ExecutionContext &context)
: allocator(gstate_p.CreateAllocator()), state(op.aggregates), child_executor(context.client),
aggregate_input_chunk(), filter_set() {
auto &gstate = gstate_p.Cast<UngroupedAggregateGlobalSinkState>();

Expand All @@ -133,7 +145,7 @@ class UngroupedAggregateLocalSinkState : public LocalSinkState {
}

//! Local arena allocator
ArenaAllocator allocator;
ArenaAllocator &allocator;
//! The local aggregate state
AggregateState state;
//! The executor
Expand Down Expand Up @@ -192,7 +204,7 @@ unique_ptr<GlobalSinkState> PhysicalUngroupedAggregate::GetGlobalSinkState(Clien

unique_ptr<LocalSinkState> PhysicalUngroupedAggregate::GetLocalSinkState(ExecutionContext &context) const {
D_ASSERT(sink_state);
auto &gstate = *sink_state;
auto &gstate = sink_state->Cast<UngroupedAggregateGlobalSinkState>();
return make_uniq<UngroupedAggregateLocalSinkState>(*this, children[0]->GetTypes(), gstate, context);
}

Expand Down Expand Up @@ -348,7 +360,6 @@ SinkCombineResultType PhysicalUngroupedAggregate::Combine(ExecutionContext &cont
gstate.state.counts[aggr_idx] += lstate.state.counts[aggr_idx];
#endif
}
lstate.allocator.Destroy();

auto &client_profiler = QueryProfiler::Get(context.client);
context.thread.profiler.Flush(*this, lstate.child_executor, "child_executor", 0);
Expand Down Expand Up @@ -391,7 +402,7 @@ class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask {
const PhysicalUngroupedAggregate &op,
UngroupedAggregateGlobalSinkState &state_p)
: ExecutorTask(executor), event(std::move(event_p)), op(op), gstate(state_p),
allocator(BufferAllocator::Get(executor.context)) {
allocator(gstate.CreateAllocator()) {
}

TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override;
Expand All @@ -405,7 +416,7 @@ class UngroupedDistinctAggregateFinalizeTask : public ExecutorTask {
const PhysicalUngroupedAggregate &op;
UngroupedAggregateGlobalSinkState &gstate;

ArenaAllocator allocator;
ArenaAllocator &allocator;
};

void UngroupedDistinctAggregateFinalizeEvent::Schedule() {
Expand Down