Skip to content

Commit

Permalink
chore: Update vendored sources to duckdb/duckdb@2e6f748
Browse files Browse the repository at this point in the history
Merge pull request duckdb/duckdb#11528 from lnkuiper/radixht_tasks_main
Merge pull request duckdb/duckdb#11553 from carlopi/fix_assets_to_staging
Merge pull request duckdb/duckdb#11556 from carlopi/fix_github_token
  • Loading branch information
krlmlr committed Apr 8, 2024
1 parent dccad8f commit 3fd227b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 109 deletions.
187 changes: 81 additions & 106 deletions src/duckdb/src/execution/radix_partitioned_hashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,27 @@ unique_ptr<GroupedAggregateHashTable> RadixPartitionedHashTable::CreateHT(Client
//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//
enum class AggregatePartitionState : uint8_t {
//! Can be finalized
READY_TO_FINALIZE = 0,
//! Finalize is in progress
FINALIZE_IN_PROGRESS = 1,
//! Finalized, ready to scan
READY_TO_SCAN = 2
};

struct AggregatePartition {
explicit AggregatePartition(unique_ptr<TupleDataCollection> data_p)
: data(std::move(data_p)), progress(0), finalized(false) {
: state(AggregatePartitionState::READY_TO_FINALIZE), data(std::move(data_p)), progress(0) {
}

mutex lock;
AggregatePartitionState state;

unique_ptr<TupleDataCollection> data;
atomic<double> progress;
atomic<bool> finalized;

vector<InterruptState> blocked_tasks;
};

class RadixHTGlobalSinkState;
Expand Down Expand Up @@ -165,9 +179,7 @@ class RadixHTGlobalSinkState : public GlobalSinkState {

//! Partitions that are finalized during GetData
vector<unique_ptr<AggregatePartition>> partitions;

//! For synchronizing finalize tasks
atomic<idx_t> finalize_idx;
//! For keeping track of progress
atomic<idx_t> finalize_done;

//! Pin properties when scanning
Expand All @@ -176,16 +188,13 @@ class RadixHTGlobalSinkState : public GlobalSinkState {
idx_t count_before_combining;
//! Maximum partition size if all unique
idx_t max_partition_size;

vector<InterruptState> blocked_tasks;
};

RadixHTGlobalSinkState::RadixHTGlobalSinkState(ClientContext &context_p, const RadixPartitionedHashTable &radix_ht_p)
: context(context_p), temporary_memory_state(TemporaryMemoryManager::Get(context).Register(context)),
radix_ht(radix_ht_p), config(context, *this), finalized(false), external(false), active_threads(0),
any_combined(false), finalize_idx(0), finalize_done(0),
scan_pin_properties(TupleDataPinProperties::DESTROY_AFTER_DONE), count_before_combining(0),
max_partition_size(0) {
any_combined(false), finalize_done(0), scan_pin_properties(TupleDataPinProperties::DESTROY_AFTER_DONE),
count_before_combining(0), max_partition_size(0) {

auto tuples_per_block = Storage::BLOCK_ALLOC_SIZE / radix_ht.GetLayout().GetRowWidth();
idx_t ht_count = config.sink_capacity / GroupedAggregateHashTable::LOAD_FACTOR;
Expand Down Expand Up @@ -516,8 +525,9 @@ void RadixPartitionedHashTable::Finalize(ClientContext &context, GlobalSinkState

gstate.partitions.emplace_back(make_uniq<AggregatePartition>(std::move(partition)));
if (single_ht) {
gstate.finalize_idx++;
gstate.partitions.back()->finalized = true;
gstate.finalize_done++;
gstate.partitions.back()->progress = 1;
gstate.partitions.back()->state = AggregatePartitionState::READY_TO_SCAN;
}
}
} else {
Expand Down Expand Up @@ -566,7 +576,8 @@ class RadixHTGlobalSourceState : public GlobalSourceState {
RadixHTGlobalSourceState(ClientContext &context, const RadixPartitionedHashTable &radix_ht);

//! Assigns a task to a local source state
bool AssignTask(RadixHTGlobalSinkState &sink, RadixHTLocalSourceState &lstate);
SourceResultType AssignTask(RadixHTGlobalSinkState &sink, RadixHTLocalSourceState &lstate,
InterruptState &interrupt_state);

public:
//! The client context
Expand All @@ -577,10 +588,10 @@ class RadixHTGlobalSourceState : public GlobalSourceState {
//! Column ids for scanning
vector<column_t> column_ids;

//! For synchronizing scan tasks
//! For synchronizing tasks
mutex lock;
idx_t scan_idx;
atomic<idx_t> scan_done;
idx_t task_idx;
atomic<idx_t> task_done;
};

enum class RadixHTScanStatus : uint8_t { INIT, IN_PROGRESS, DONE };
Expand Down Expand Up @@ -629,61 +640,44 @@ unique_ptr<LocalSourceState> RadixPartitionedHashTable::GetLocalSourceState(Exec
}

RadixHTGlobalSourceState::RadixHTGlobalSourceState(ClientContext &context_p, const RadixPartitionedHashTable &radix_ht)
: context(context_p), finished(false), scan_idx(0), scan_done(0) {
: context(context_p), finished(false), task_idx(0), task_done(0) {
for (column_t column_id = 0; column_id < radix_ht.group_types.size(); column_id++) {
column_ids.push_back(column_id);
}
}

bool RadixHTGlobalSourceState::AssignTask(RadixHTGlobalSinkState &sink, RadixHTLocalSourceState &lstate) {
D_ASSERT(lstate.scan_status != RadixHTScanStatus::IN_PROGRESS);

const auto n_partitions = sink.partitions.size();
SourceResultType RadixHTGlobalSourceState::AssignTask(RadixHTGlobalSinkState &sink, RadixHTLocalSourceState &lstate,
InterruptState &interrupt_state) {
// First, try to get a partition index
lock_guard<mutex> gstate_guard(lock);
if (finished) {
return false;
return SourceResultType::FINISHED;
}

// We first try to assign a Scan task, then a Finalize task if that didn't work
bool scan_assigned = false;
if (scan_idx < n_partitions && sink.partitions[scan_idx]->finalized) {
lstate.task_idx = scan_idx++;
scan_assigned = true;
if (scan_idx == n_partitions) {
// We will never be able to assign another task, unblock blocked tasks
lock_guard<mutex> sink_guard(sink.lock);
if (!sink.blocked_tasks.empty()) {
for (auto &state : sink.blocked_tasks) {
state.Callback();
}
sink.blocked_tasks.clear();
}
}
if (task_idx == sink.partitions.size()) {
return SourceResultType::FINISHED;
}
lstate.task_idx = task_idx++;

if (scan_assigned) {
// We successfully assigned a Scan task
D_ASSERT(lstate.task_idx < n_partitions && sink.partitions[lstate.task_idx]->finalized);
// We got a partition index
auto &partition = *sink.partitions[lstate.task_idx];
auto partition_lock = unique_lock<mutex>(partition.lock);
switch (partition.state) {
case AggregatePartitionState::READY_TO_FINALIZE:
partition.state = AggregatePartitionState::FINALIZE_IN_PROGRESS;
lstate.task = RadixHTSourceTaskType::FINALIZE;
return SourceResultType::HAVE_MORE_OUTPUT;
case AggregatePartitionState::FINALIZE_IN_PROGRESS:
lstate.task = RadixHTSourceTaskType::SCAN;
lstate.scan_status = RadixHTScanStatus::INIT;
return true;
}

// We didn't assign a Scan task
if (sink.finalize_idx >= n_partitions) {
lstate.ht.reset();
return false; // No finalize tasks left
}

// We can just increment the atomic here, much simpler than assigning the scan task
lstate.task_idx = sink.finalize_idx++;
if (lstate.task_idx < n_partitions) {
// We successfully assigned a Finalize task
lstate.task = RadixHTSourceTaskType::FINALIZE;
return true;
partition.blocked_tasks.push_back(interrupt_state);
return SourceResultType::BLOCKED;
case AggregatePartitionState::READY_TO_SCAN:
lstate.task = RadixHTSourceTaskType::SCAN;
lstate.scan_status = RadixHTScanStatus::INIT;
return SourceResultType::HAVE_MORE_OUTPUT;
default:
throw InternalException("Unexpected AggregatePartitionState in RadixHTLocalSourceState::Finalize!");
}

// We didn't manage to assign a Finalize task because there are none left
return false;
}

RadixHTLocalSourceState::RadixHTLocalSourceState(ExecutionContext &context, const RadixPartitionedHashTable &radix_ht)
Expand All @@ -699,6 +693,7 @@ RadixHTLocalSourceState::RadixHTLocalSourceState(ExecutionContext &context, cons

void RadixHTLocalSourceState::ExecuteTask(RadixHTGlobalSinkState &sink, RadixHTGlobalSourceState &gstate,
DataChunk &chunk) {
D_ASSERT(task != RadixHTSourceTaskType::NO_TASK);
switch (task) {
case RadixHTSourceTaskType::FINALIZE:
Finalize(sink, gstate);
Expand All @@ -714,12 +709,7 @@ void RadixHTLocalSourceState::ExecuteTask(RadixHTGlobalSinkState &sink, RadixHTG
void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlobalSourceState &gstate) {
D_ASSERT(task == RadixHTSourceTaskType::FINALIZE);
D_ASSERT(scan_status != RadixHTScanStatus::IN_PROGRESS);

auto &partition = *sink.partitions[task_idx];
if (partition.data->Count() == 0) {
partition.finalized = true;
return;
}

if (!ht) {
// This capacity would always be sufficient for all data
Expand All @@ -730,7 +720,7 @@ void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlob
const idx_t memory_limit = BufferManager::GetBufferManager(gstate.context).GetMaxMemory();
const idx_t thread_limit = 0.6 * memory_limit / n_threads;

const idx_t size_per_entry = partition.data->SizeInBytes() / partition.data->Count() +
const idx_t size_per_entry = partition.data->SizeInBytes() / MaxValue<idx_t>(partition.data->Count(), 1) +
idx_t(GroupedAggregateHashTable::LOAD_FACTOR * sizeof(aggr_ht_entry_t));
const auto capacity_limit = NextPowerOfTwo(thread_limit / size_per_entry);

Expand All @@ -745,69 +735,59 @@ void RadixHTLocalSourceState::Finalize(RadixHTGlobalSinkState &sink, RadixHTGlob
// Now combine the uncombined data using this thread's HT
ht->Combine(*partition.data, &partition.progress);
ht->UnpinData();
partition.progress = 1;

// Move the combined data back to the partition
partition.data =
make_uniq<TupleDataCollection>(BufferManager::GetBufferManager(gstate.context), sink.radix_ht.GetLayout());
partition.data->Combine(*ht->GetPartitionedData()->GetPartitions()[0]);

// Mark partition as ready to scan
lock_guard<mutex> glock(gstate.lock);
partition.finalized = true;

if (++sink.finalize_done == sink.partitions.size()) {
// Update thread-global state
lock_guard<mutex> global_guard(gstate.lock);
sink.stored_allocators.emplace_back(ht->GetAggregateAllocator());
const auto finalizes_done = ++sink.finalize_done;
D_ASSERT(finalizes_done <= sink.partitions.size());
if (finalizes_done == sink.partitions.size()) {
// All finalizes are done, set remaining size to 0
sink.temporary_memory_state->SetRemainingSize(sink.context, 0);
}

// Unblock blocked tasks so they can scan this partition
lock_guard<mutex> sink_guard(sink.lock);
if (!sink.blocked_tasks.empty()) {
for (auto &state : sink.blocked_tasks) {
state.Callback();
}
sink.blocked_tasks.clear();
// Update partition state
lock_guard<mutex> partition_guard(partition.lock);
partition.state = AggregatePartitionState::READY_TO_SCAN;
for (auto &blocked_task : partition.blocked_tasks) {
blocked_task.Callback();
}
partition.blocked_tasks.clear();

// Make sure this thread's aggregate allocator does not get lost
sink.stored_allocators.emplace_back(ht->GetAggregateAllocator());
// This thread will scan the partition
task = RadixHTSourceTaskType::SCAN;
scan_status = RadixHTScanStatus::INIT;
}

void RadixHTLocalSourceState::Scan(RadixHTGlobalSinkState &sink, RadixHTGlobalSourceState &gstate, DataChunk &chunk) {
D_ASSERT(task == RadixHTSourceTaskType::SCAN);
D_ASSERT(scan_status != RadixHTScanStatus::DONE);

auto &partition = *sink.partitions[task_idx];
D_ASSERT(partition.finalized);
D_ASSERT(partition.state == AggregatePartitionState::READY_TO_SCAN);
auto &data_collection = *partition.data;

if (data_collection.Count() == 0) {
scan_status = RadixHTScanStatus::DONE;
lock_guard<mutex> gstate_guard(gstate.lock);
if (++gstate.scan_done == sink.partitions.size()) {
gstate.finished = true;
}
return;
}

if (scan_status == RadixHTScanStatus::INIT) {
data_collection.InitializeScan(scan_state, gstate.column_ids, sink.scan_pin_properties);
scan_status = RadixHTScanStatus::IN_PROGRESS;
}

if (!data_collection.Scan(scan_state, scan_chunk)) {
scan_status = RadixHTScanStatus::DONE;
if (sink.scan_pin_properties == TupleDataPinProperties::DESTROY_AFTER_DONE) {
data_collection.Reset();
}
return;
}

if (data_collection.ScanComplete(scan_state)) {
scan_status = RadixHTScanStatus::DONE;
lock_guard<mutex> gstate_guard(gstate.lock);
if (++gstate.scan_done == sink.partitions.size()) {
if (++gstate.task_done == sink.partitions.size()) {
gstate.finished = true;
}
return;
}

RowOperationsState row_state(aggregate_allocator);
Expand Down Expand Up @@ -902,15 +882,10 @@ SourceResultType RadixPartitionedHashTable::GetData(ExecutionContext &context, D

while (!gstate.finished && chunk.size() == 0) {
if (lstate.TaskFinished()) {
lock_guard<mutex> gstate_guard(gstate.lock);
if (!gstate.AssignTask(sink, lstate)) {
if (gstate.scan_idx < sink.partitions.size()) {
lock_guard<mutex> sink_guard(sink.lock);
sink.blocked_tasks.push_back(input.interrupt_state);
return SourceResultType::BLOCKED;
} else {
return SourceResultType::FINISHED;
}
const auto res = gstate.AssignTask(sink, lstate, input.interrupt_state);
if (res != SourceResultType::HAVE_MORE_OUTPUT) {
D_ASSERT(res == SourceResultType::FINISHED || res == SourceResultType::BLOCKED);
return res;
}
}
lstate.ExecuteTask(sink, gstate, chunk);
Expand All @@ -931,11 +906,11 @@ double RadixPartitionedHashTable::GetProgress(ClientContext &, GlobalSinkState &
// Get partition combine progress, weigh it 2x
double total_progress = 0;
for (auto &partition : sink.partitions) {
total_progress += partition->progress * 2.0;
total_progress += 2.0 * partition->progress;
}

// Get scan progress, weigh it 1x
total_progress += gstate.scan_done;
total_progress += 1.0 * gstate.task_done;

// Divide by 3x for the weights, and the number of partitions to get a value between 0 and 1 again
total_progress /= 3.0 * sink.partitions.size();
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "2-dev497"
#define DUCKDB_PATCH_VERSION "2-dev504"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 10
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 0
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v0.10.2-dev497"
#define DUCKDB_VERSION "v0.10.2-dev504"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "b4408a8aab"
#define DUCKDB_SOURCE_ID "2e6f74803b"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down

0 comments on commit 3fd227b

Please sign in to comment.