From 904d22047983aaa207f38fec65f1b2d1f782ba36 Mon Sep 17 00:00:00 2001 From: Tom Ebergen Date: Fri, 13 Jan 2023 13:50:43 +0100 Subject: [PATCH] collecting samples in parallel now, now I need to figure out how to combine them in a proper uniform and weighted manner --- .../helper/physical_reservoir_sample.cpp | 50 ++++++++++++++++--- src/execution/reservoir_sample.cpp | 21 ++++---- .../aggregate/holistic/reservoir_quantile.cpp | 6 +-- .../helper/physical_reservoir_sample.hpp | 5 +- .../duckdb/execution/physical_operator.hpp | 5 +- .../duckdb/execution/reservoir_sample.hpp | 7 ++- test/sql/sample/reservoir_testing.test | 2 +- test/sql/sample/test_sample.test | 9 +++- 8 files changed, 80 insertions(+), 25 deletions(-) diff --git a/src/execution/operator/helper/physical_reservoir_sample.cpp b/src/execution/operator/helper/physical_reservoir_sample.cpp index fbe35a8e8cd..17c6aa8faf1 100644 --- a/src/execution/operator/helper/physical_reservoir_sample.cpp +++ b/src/execution/operator/helper/physical_reservoir_sample.cpp @@ -1,14 +1,17 @@ #include "duckdb/execution/operator/helper/physical_reservoir_sample.hpp" #include "duckdb/execution/reservoir_sample.hpp" +#include "iostream" namespace duckdb { //===--------------------------------------------------------------------===// // Sink //===--------------------------------------------------------------------===// -class SampleGlobalSinkState : public GlobalSinkState { +class SampleLocalSinkState : public LocalSinkState { public: - explicit SampleGlobalSinkState(Allocator &allocator, SampleOptions &options) { + explicit SampleLocalSinkState(ClientContext &context, const PhysicalReservoirSample &sampler, SampleOptions &options) { + // Here I need to initialize the reservoir sample again from the local state. + auto &allocator = Allocator::Get(context); if (options.is_percentage) { auto percentage = options.sample_size.GetValue(); if (percentage == 0) { @@ -30,24 +33,59 @@ class SampleGlobalSinkState : public GlobalSinkState { unique_ptr sample; }; +class SampleGlobalSinkState : public GlobalSinkState { +public: + explicit SampleGlobalSinkState(Allocator &allocator, SampleOptions &options) { + + } + + //! The lock for updating the global aggregate state + mutex lock; + //! The reservoir sample + unique_ptr sample; + vector> intermediate_samples; +}; + unique_ptr PhysicalReservoirSample::GetGlobalSinkState(ClientContext &context) const { return make_unique(Allocator::Get(context), *options); } +unique_ptr PhysicalReservoirSample::GetLocalSinkState(ExecutionContext &context) const { + return make_unique(context.client, *this, *options); +} + SinkResultType PhysicalReservoirSample::Sink(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate, DataChunk &input) const { - auto &gstate = (SampleGlobalSinkState &)state; - if (!gstate.sample) { + auto &local_state = (SampleLocalSinkState &)lstate; + if (!local_state.sample) { return SinkResultType::FINISHED; } + // here we add samples to local state, then we eventually combine them in the global state using Combine() + // Why am I confused? + // When do I know when a thread collects no more data? + // Is there a way to know how much data a thread will eventually collect? + // we implement reservoir sampling without replacement and exponential jumps here // the algorithm is adopted from the paper Weighted random sampling with a reservoir by Pavlos S. Efraimidis et al. // note that the original algorithm is about weighted sampling; this is a simplified approach for uniform sampling - lock_guard glock(gstate.lock); - gstate.sample->AddToReservoir(input); + local_state.sample->AddToReservoir(input); return SinkResultType::NEED_MORE_INPUT; } +void PhysicalReservoirSample::Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const { + auto &global_state = (SampleGlobalSinkState &)gstate; + auto &local_state = (SampleLocalSinkState &)lstate; + lock_guard glock(global_state.lock); + global_state.intermediate_samples.push_back(move(local_state.sample)); +} + +SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const { + auto &global_state = (SampleGlobalSinkState &)gstate; + global_state.sample = move(global_state.intermediate_samples.back()); + global_state.intermediate_samples.pop_back(); + return SinkFinalizeType::READY; +} + //===--------------------------------------------------------------------===// // Source //===--------------------------------------------------------------------===// diff --git a/src/execution/reservoir_sample.cpp b/src/execution/reservoir_sample.cpp index ddfeb25cb22..b25fbb138a7 100644 --- a/src/execution/reservoir_sample.cpp +++ b/src/execution/reservoir_sample.cpp @@ -12,6 +12,7 @@ void ReservoirSample::AddToReservoir(DataChunk &input) { if (sample_count == 0) { return; } + base_reservoir_sample.num_entries_seen_total += input.size(); // Input: A population V of n weighted items // Output: A reservoir R with a size m // 1: The first m items of V are inserted into R @@ -23,14 +24,14 @@ void ReservoirSample::AddToReservoir(DataChunk &input) { } } D_ASSERT(reservoir_chunk->GetCapacity() == sample_count); - // find the position of next_index_to_sample relative to number of seen entries (num_seen_entries) + // find the position of next_index_to_sample relative to number of seen entries (num_entries_to_skip_b4_next_sample) idx_t remaining = input.size(); idx_t base_offset = 0; while (true) { - idx_t offset = base_reservoir_sample.next_index_to_sample - base_reservoir_sample.num_seen_entries; + idx_t offset = base_reservoir_sample.next_index_to_sample - base_reservoir_sample.num_entries_to_skip_b4_next_sample; if (offset >= remaining) { // not in this chunk! increment current count and go to the next chunk - base_reservoir_sample.num_seen_entries += remaining; + base_reservoir_sample.num_entries_to_skip_b4_next_sample += remaining; return; } D_ASSERT(reservoir_chunk->GetCapacity() == sample_count); @@ -75,7 +76,7 @@ void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk) { D_ASSERT(reservoir_chunk->GetCapacity() == sample_count); for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) { D_ASSERT(reservoir_chunk->GetCapacity() == sample_count); - reservoir_chunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry, + reservoir_chunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry_index, input.GetValue(col_idx, index_in_chunk)); } base_reservoir_sample.ReplaceElement(); @@ -144,6 +145,7 @@ ReservoirSamplePercentage::ReservoirSamplePercentage(Allocator &allocator, doubl } void ReservoirSamplePercentage::AddToReservoir(DataChunk &input) { + base_reservoir_sample.num_entries_seen_total += input.size(); if (current_count + input.size() > RESERVOIR_THRESHOLD) { // we don't have enough space in our current reservoir // first check what we still need to append to the current sample @@ -228,8 +230,9 @@ void ReservoirSamplePercentage::Finalize() { BaseReservoirSampling::BaseReservoirSampling(int64_t seed) : random(seed) { next_index_to_sample = 0; min_weight_threshold = 0; - min_weighted_entry = 0; - num_seen_entries = 0; + min_weighted_entry_index = 0; + num_entries_to_skip_b4_next_sample = 0; + num_entries_seen_total = 0; } BaseReservoirSampling::BaseReservoirSampling() : BaseReservoirSampling(-1) { @@ -260,9 +263,9 @@ void BaseReservoirSampling::SetNextEntry() { //! 6. wc +wc+1 +···+wi−1 < Xw <= wc +wc+1 +···+wi−1 +wi //! since all our weights are 1 (uniform sampling), we can just determine the amount of elements to skip min_weight_threshold = t_w; - min_weighted_entry = min_key.second; + min_weighted_entry_index = min_key.second; next_index_to_sample = MaxValue(1, idx_t(round(x_w))); - num_seen_entries = 0; + num_entries_to_skip_b4_next_sample = 0; } void BaseReservoirSampling::ReplaceElement() { @@ -275,7 +278,7 @@ void BaseReservoirSampling::ReplaceElement() { //! we generate a random number between (min_weight_threshold, 1) double r2 = random.NextRandom(min_weight_threshold, 1); //! now we insert the new weight into the reservoir - reservoir_weights.push(std::make_pair(-r2, min_weighted_entry)); + reservoir_weights.push(std::make_pair(-r2, min_weighted_entry_index)); //! we update the min entry with the new min entry in the reservoir SetNextEntry(); } diff --git a/src/function/aggregate/holistic/reservoir_quantile.cpp b/src/function/aggregate/holistic/reservoir_quantile.cpp index 7a91a8dc134..7476a03dd25 100644 --- a/src/function/aggregate/holistic/reservoir_quantile.cpp +++ b/src/function/aggregate/holistic/reservoir_quantile.cpp @@ -29,7 +29,7 @@ struct ReservoirQuantileState { } void ReplaceElement(T &input) { - v[r_samp->min_weighted_entry] = input; + v[r_samp->min_weighted_entry_index] = input; r_samp->ReplaceElement(); } @@ -38,8 +38,8 @@ struct ReservoirQuantileState { v[pos++] = element; r_samp->InitializeReservoir(pos, len); } else { - D_ASSERT(r_samp->next_index_to_sample >= r_samp->num_seen_entries); - if (r_samp->next_index_to_sample == r_samp->num_seen_entries) { + D_ASSERT(r_samp->next_index_to_sample >= r_samp->num_entries_to_skip_b4_next_sample); + if (r_samp->next_index_to_sample == r_samp->num_entries_to_skip_b4_next_sample) { ReplaceElement(element); } } diff --git a/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp b/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp index c7ceefabec1..36cbef798ae 100644 --- a/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp +++ b/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp @@ -35,7 +35,10 @@ class PhysicalReservoirSample : public PhysicalOperator { SinkResultType Sink(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate, DataChunk &input) const override; unique_ptr GetGlobalSinkState(ClientContext &context) const override; - + unique_ptr GetLocalSinkState(ExecutionContext &context) const override; + void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const override; + SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context, + GlobalSinkState &gstate) const override; bool ParallelSink() const override { return true; } diff --git a/src/include/duckdb/execution/physical_operator.hpp b/src/include/duckdb/execution/physical_operator.hpp index 51871c4b079..98ae68b9d50 100644 --- a/src/include/duckdb/execution/physical_operator.hpp +++ b/src/include/duckdb/execution/physical_operator.hpp @@ -186,7 +186,8 @@ class PhysicalOperator { // Sink interface //! The sink method is called constantly with new input, as long as new input is available. Note that this method - //! CAN be called in parallel, proper locking is needed when accessing data inside the GlobalSinkState. + //! CAN be called in parallel, proper locking is needed when accessing dat + //!a inside the GlobalSinkState. virtual SinkResultType Sink(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate, DataChunk &input) const; // The combine is called when a single thread has completed execution of its part of the pipeline, it is the final @@ -195,7 +196,7 @@ class PhysicalOperator { virtual void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const; //! The finalize is called when ALL threads are finished execution. It is called only once per pipeline, and is //! entirely single threaded. - //! If Finalize returns SinkResultType::FINISHED, the sink is marked as finished + //! If Finalize returns SinkResultType::Finished, the sink is marked as finished virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const; diff --git a/src/include/duckdb/execution/reservoir_sample.hpp b/src/include/duckdb/execution/reservoir_sample.hpp index e858cdff394..461fe346557 100644 --- a/src/include/duckdb/execution/reservoir_sample.hpp +++ b/src/include/duckdb/execution/reservoir_sample.hpp @@ -35,10 +35,13 @@ class BaseReservoirSampling { //! The reservoir threshold of the current min entry double min_weight_threshold; //! The reservoir index of the current min entry - idx_t min_weighted_entry; + idx_t min_weighted_entry_index; //! The current count towards next index (i.e. we will replace an entry in next_index - current_count tuples) //! The number of entries "seen" before choosing one that will go in our reservoir sample. - idx_t num_seen_entries; + idx_t num_entries_to_skip_b4_next_sample; + //! when collecting a sample in parallel, we want to know how many values each thread has seen + //! so we can collect the samples from the thread local states in a uniform manner + idx_t num_entries_seen_total; }; class BlockingSample { diff --git a/test/sql/sample/reservoir_testing.test b/test/sql/sample/reservoir_testing.test index 547731dc962..793344c3012 100644 --- a/test/sql/sample/reservoir_testing.test +++ b/test/sql/sample/reservoir_testing.test @@ -5,7 +5,7 @@ require tpch statement ok -CALL dbgen(sf=0.1); +CALL dbgen(sf=5); statement ok PRAGMA enable_verification; diff --git a/test/sql/sample/test_sample.test b/test/sql/sample/test_sample.test index bd60ed9a9c4..fc54468f3b0 100644 --- a/test/sql/sample/test_sample.test +++ b/test/sql/sample/test_sample.test @@ -67,7 +67,14 @@ SELECT COUNT(*) FROM range(10000) USING SAMPLE 5 query I SELECT COUNT(*) FROM range(1000000) USING SAMPLE 1000100 ---- -100000 +1000000 + + +query I +SELECT COUNT(*) FROM range(1000000) USING SAMPLE 2 +---- +2 + # test sample with multiple columns # we insert the same data in the entire column