Skip to content

Commit

Permalink
collecting samples in parallel now, now I need to figure out how to c…
Browse files Browse the repository at this point in the history
…ombine them in a proper uniform and weighted manner
  • Loading branch information
Tmonster committed Jan 13, 2023
1 parent 01c4b89 commit 904d220
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 25 deletions.
50 changes: 44 additions & 6 deletions src/execution/operator/helper/physical_reservoir_sample.cpp
@@ -1,14 +1,17 @@
#include "duckdb/execution/operator/helper/physical_reservoir_sample.hpp"
#include "duckdb/execution/reservoir_sample.hpp"
#include "iostream"

namespace duckdb {

//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//
class SampleGlobalSinkState : public GlobalSinkState {
class SampleLocalSinkState : public LocalSinkState {
public:
explicit SampleGlobalSinkState(Allocator &allocator, SampleOptions &options) {
explicit SampleLocalSinkState(ClientContext &context, const PhysicalReservoirSample &sampler, SampleOptions &options) {
// Here I need to initialize the reservoir sample again from the local state.
auto &allocator = Allocator::Get(context);
if (options.is_percentage) {
auto percentage = options.sample_size.GetValue<double>();
if (percentage == 0) {
Expand All @@ -30,24 +33,59 @@ class SampleGlobalSinkState : public GlobalSinkState {
unique_ptr<BlockingSample> sample;
};

class SampleGlobalSinkState : public GlobalSinkState {
public:
explicit SampleGlobalSinkState(Allocator &allocator, SampleOptions &options) {

}

//! The lock for updating the global aggregate state
mutex lock;
//! The reservoir sample
unique_ptr<BlockingSample> sample;
vector<unique_ptr<BlockingSample>> intermediate_samples;
};

unique_ptr<GlobalSinkState> PhysicalReservoirSample::GetGlobalSinkState(ClientContext &context) const {
return make_unique<SampleGlobalSinkState>(Allocator::Get(context), *options);
}

unique_ptr<LocalSinkState> PhysicalReservoirSample::GetLocalSinkState(ExecutionContext &context) const {
return make_unique<SampleLocalSinkState>(context.client, *this, *options);
}

SinkResultType PhysicalReservoirSample::Sink(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate,
DataChunk &input) const {
auto &gstate = (SampleGlobalSinkState &)state;
if (!gstate.sample) {
auto &local_state = (SampleLocalSinkState &)lstate;
if (!local_state.sample) {
return SinkResultType::FINISHED;
}
// here we add samples to local state, then we eventually combine them in the global state using Combine()
// Why am I confused?
// When do I know when a thread collects no more data?
// Is there a way to know how much data a thread will eventually collect?

// we implement reservoir sampling without replacement and exponential jumps here
// the algorithm is adopted from the paper Weighted random sampling with a reservoir by Pavlos S. Efraimidis et al.
// note that the original algorithm is about weighted sampling; this is a simplified approach for uniform sampling
lock_guard<mutex> glock(gstate.lock);
gstate.sample->AddToReservoir(input);
local_state.sample->AddToReservoir(input);
return SinkResultType::NEED_MORE_INPUT;
}

void PhysicalReservoirSample::Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const {
auto &global_state = (SampleGlobalSinkState &)gstate;
auto &local_state = (SampleLocalSinkState &)lstate;
lock_guard<mutex> glock(global_state.lock);
global_state.intermediate_samples.push_back(move(local_state.sample));
}

SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const {
auto &global_state = (SampleGlobalSinkState &)gstate;
global_state.sample = move(global_state.intermediate_samples.back());
global_state.intermediate_samples.pop_back();
return SinkFinalizeType::READY;
}

//===--------------------------------------------------------------------===//
// Source
//===--------------------------------------------------------------------===//
Expand Down
21 changes: 12 additions & 9 deletions src/execution/reservoir_sample.cpp
Expand Up @@ -12,6 +12,7 @@ void ReservoirSample::AddToReservoir(DataChunk &input) {
if (sample_count == 0) {
return;
}
base_reservoir_sample.num_entries_seen_total += input.size();
// Input: A population V of n weighted items
// Output: A reservoir R with a size m
// 1: The first m items of V are inserted into R
Expand All @@ -23,14 +24,14 @@ void ReservoirSample::AddToReservoir(DataChunk &input) {
}
}
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
// find the position of next_index_to_sample relative to number of seen entries (num_seen_entries)
// find the position of next_index_to_sample relative to number of seen entries (num_entries_to_skip_b4_next_sample)
idx_t remaining = input.size();
idx_t base_offset = 0;
while (true) {
idx_t offset = base_reservoir_sample.next_index_to_sample - base_reservoir_sample.num_seen_entries;
idx_t offset = base_reservoir_sample.next_index_to_sample - base_reservoir_sample.num_entries_to_skip_b4_next_sample;
if (offset >= remaining) {
// not in this chunk! increment current count and go to the next chunk
base_reservoir_sample.num_seen_entries += remaining;
base_reservoir_sample.num_entries_to_skip_b4_next_sample += remaining;
return;
}
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
Expand Down Expand Up @@ -75,7 +76,7 @@ void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk) {
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
reservoir_chunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry,
reservoir_chunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry_index,
input.GetValue(col_idx, index_in_chunk));
}
base_reservoir_sample.ReplaceElement();
Expand Down Expand Up @@ -144,6 +145,7 @@ ReservoirSamplePercentage::ReservoirSamplePercentage(Allocator &allocator, doubl
}

void ReservoirSamplePercentage::AddToReservoir(DataChunk &input) {
base_reservoir_sample.num_entries_seen_total += input.size();
if (current_count + input.size() > RESERVOIR_THRESHOLD) {
// we don't have enough space in our current reservoir
// first check what we still need to append to the current sample
Expand Down Expand Up @@ -228,8 +230,9 @@ void ReservoirSamplePercentage::Finalize() {
BaseReservoirSampling::BaseReservoirSampling(int64_t seed) : random(seed) {
next_index_to_sample = 0;
min_weight_threshold = 0;
min_weighted_entry = 0;
num_seen_entries = 0;
min_weighted_entry_index = 0;
num_entries_to_skip_b4_next_sample = 0;
num_entries_seen_total = 0;
}

BaseReservoirSampling::BaseReservoirSampling() : BaseReservoirSampling(-1) {
Expand Down Expand Up @@ -260,9 +263,9 @@ void BaseReservoirSampling::SetNextEntry() {
//! 6. wc +wc+1 +···+wi−1 < Xw <= wc +wc+1 +···+wi−1 +wi
//! since all our weights are 1 (uniform sampling), we can just determine the amount of elements to skip
min_weight_threshold = t_w;
min_weighted_entry = min_key.second;
min_weighted_entry_index = min_key.second;
next_index_to_sample = MaxValue<idx_t>(1, idx_t(round(x_w)));
num_seen_entries = 0;
num_entries_to_skip_b4_next_sample = 0;
}

void BaseReservoirSampling::ReplaceElement() {
Expand All @@ -275,7 +278,7 @@ void BaseReservoirSampling::ReplaceElement() {
//! we generate a random number between (min_weight_threshold, 1)
double r2 = random.NextRandom(min_weight_threshold, 1);
//! now we insert the new weight into the reservoir
reservoir_weights.push(std::make_pair(-r2, min_weighted_entry));
reservoir_weights.push(std::make_pair(-r2, min_weighted_entry_index));
//! we update the min entry with the new min entry in the reservoir
SetNextEntry();
}
Expand Down
6 changes: 3 additions & 3 deletions src/function/aggregate/holistic/reservoir_quantile.cpp
Expand Up @@ -29,7 +29,7 @@ struct ReservoirQuantileState {
}

void ReplaceElement(T &input) {
v[r_samp->min_weighted_entry] = input;
v[r_samp->min_weighted_entry_index] = input;
r_samp->ReplaceElement();
}

Expand All @@ -38,8 +38,8 @@ struct ReservoirQuantileState {
v[pos++] = element;
r_samp->InitializeReservoir(pos, len);
} else {
D_ASSERT(r_samp->next_index_to_sample >= r_samp->num_seen_entries);
if (r_samp->next_index_to_sample == r_samp->num_seen_entries) {
D_ASSERT(r_samp->next_index_to_sample >= r_samp->num_entries_to_skip_b4_next_sample);
if (r_samp->next_index_to_sample == r_samp->num_entries_to_skip_b4_next_sample) {
ReplaceElement(element);
}
}
Expand Down
Expand Up @@ -35,7 +35,10 @@ class PhysicalReservoirSample : public PhysicalOperator {
SinkResultType Sink(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate,
DataChunk &input) const override;
unique_ptr<GlobalSinkState> GetGlobalSinkState(ClientContext &context) const override;

unique_ptr<LocalSinkState> GetLocalSinkState(ExecutionContext &context) const override;
void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const override;
SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
GlobalSinkState &gstate) const override;
bool ParallelSink() const override {
return true;
}
Expand Down
5 changes: 3 additions & 2 deletions src/include/duckdb/execution/physical_operator.hpp
Expand Up @@ -186,7 +186,8 @@ class PhysicalOperator {
// Sink interface

//! The sink method is called constantly with new input, as long as new input is available. Note that this method
//! CAN be called in parallel, proper locking is needed when accessing data inside the GlobalSinkState.
//! CAN be called in parallel, proper locking is needed when accessing dat
//!a inside the GlobalSinkState.
virtual SinkResultType Sink(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate,
DataChunk &input) const;
// The combine is called when a single thread has completed execution of its part of the pipeline, it is the final
Expand All @@ -195,7 +196,7 @@ class PhysicalOperator {
virtual void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const;
//! The finalize is called when ALL threads are finished execution. It is called only once per pipeline, and is
//! entirely single threaded.
//! If Finalize returns SinkResultType::FINISHED, the sink is marked as finished
//! If Finalize returns SinkResultType::Finished, the sink is marked as finished
virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
GlobalSinkState &gstate) const;

Expand Down
7 changes: 5 additions & 2 deletions src/include/duckdb/execution/reservoir_sample.hpp
Expand Up @@ -35,10 +35,13 @@ class BaseReservoirSampling {
//! The reservoir threshold of the current min entry
double min_weight_threshold;
//! The reservoir index of the current min entry
idx_t min_weighted_entry;
idx_t min_weighted_entry_index;
//! The current count towards next index (i.e. we will replace an entry in next_index - current_count tuples)
//! The number of entries "seen" before choosing one that will go in our reservoir sample.
idx_t num_seen_entries;
idx_t num_entries_to_skip_b4_next_sample;
//! when collecting a sample in parallel, we want to know how many values each thread has seen
//! so we can collect the samples from the thread local states in a uniform manner
idx_t num_entries_seen_total;
};

class BlockingSample {
Expand Down
2 changes: 1 addition & 1 deletion test/sql/sample/reservoir_testing.test
Expand Up @@ -5,7 +5,7 @@
require tpch

statement ok
CALL dbgen(sf=0.1);
CALL dbgen(sf=5);

statement ok
PRAGMA enable_verification;
Expand Down
9 changes: 8 additions & 1 deletion test/sql/sample/test_sample.test
Expand Up @@ -67,7 +67,14 @@ SELECT COUNT(*) FROM range(10000) USING SAMPLE 5
query I
SELECT COUNT(*) FROM range(1000000) USING SAMPLE 1000100
----
100000
1000000


query I
SELECT COUNT(*) FROM range(1000000) USING SAMPLE 2
----
2


# test sample with multiple columns
# we insert the same data in the entire column
Expand Down

0 comments on commit 904d220

Please sign in to comment.