Skip to content

Commit

Permalink
intermediate commit, will fix other spots later
Browse files Browse the repository at this point in the history
  • Loading branch information
Tmonster committed Jan 16, 2023
1 parent 3676737 commit 8a01b32
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 8 deletions.
47 changes: 39 additions & 8 deletions src/execution/operator/helper/physical_reservoir_sample.cpp
Expand Up @@ -36,14 +36,26 @@ class SampleLocalSinkState : public LocalSinkState {
class SampleGlobalSinkState : public GlobalSinkState {
public:
explicit SampleGlobalSinkState(Allocator &allocator, SampleOptions &options) {

if (options.is_percentage) {
auto percentage = options.sample_size.GetValue<double>();
if (percentage == 0) {
return;
}
sample = make_unique<ReservoirSamplePercentage>(allocator, percentage, options.seed);
} else {
auto size = options.sample_size.GetValue<int64_t>();
if (size == 0) {
return;
}
sample = make_unique<ReservoirSample>(allocator, size, options.seed);
}
}

//! The lock for updating the global aggregate state
mutex lock;
//! The reservoir sample
unique_ptr<BlockingSample> sample;
vector<unique_ptr<BlockingSample>> intermediate_samples;
vector<intermediate_sample_and_pop_count> intermediate_samples;
};

unique_ptr<GlobalSinkState> PhysicalReservoirSample::GetGlobalSinkState(ClientContext &context) const {
Expand Down Expand Up @@ -76,23 +88,42 @@ void PhysicalReservoirSample::Combine(ExecutionContext &context, GlobalSinkState
auto &global_state = (SampleGlobalSinkState &)gstate;
auto &local_state = (SampleLocalSinkState &)lstate;
lock_guard<mutex> glock(global_state.lock);
global_state.intermediate_samples.push_back(move(local_state.sample));
global_state.intermediate_samples.push_back(intermediate_sample_and_pop_count(move(local_state.sample), 0));
}

SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const {
auto &global_state = (SampleGlobalSinkState &)gstate;
auto total_count = 0;
for (auto &sample : global_state.intermediate_samples) {
total_count += sample->base_reservoir_sample.num_entries_seen_total;
total_count += sample.isample->base_reservoir_sample.num_entries_seen_total;
}
for (auto &sample : global_state.intermediate_samples) {
idx_t sample_count;
if (options->is_percentage) {
auto percentage = options->sample_size.GetValue<double>();
sample_count = percentage * total_count;
} else {
auto size = options->sample_size.GetValue<int64_t>();
sample_count = size;
}

auto weights = vector<double>();
for (idx_t i = 0; i < global_state.intermediate_samples.size(); i++) {
// get the proper amount of data for the sample.
// calculate sample_to_add, num_entries_seen_total / total_count
// Call sample->GetChunk until you get samples_to_add.
global_state.sample->AddToReservoir(sample->GetChunk());
auto &sample = global_state.intermediate_samples.at(i);
double fraction_of_samples_to_add = sample.isample->base_reservoir_sample.num_entries_seen_total / total_count;
sample.weight = fraction_of_samples_to_add;
}
for (idx_t j = 0; j < sample_count; j++) {
// generate random number in between 0 and sample_count
// check which sample should be popped from and increase the sample pop count
}
global_state.sample = move(global_state.intermediate_samples.back());
global_state.intermediate_samples.pop_back();
for (idx_t k = 0; k < global_state.intermediate_samples.size(); k++) {
// add chunks to the global sample until the pop count for the sample reaches 0
}

global_state.intermediate_samples.clear();
return SinkFinalizeType::READY;
}

Expand Down
19 changes: 19 additions & 0 deletions src/execution/reservoir_sample.cpp
Expand Up @@ -8,6 +8,11 @@ ReservoirSample::ReservoirSample(Allocator &allocator, idx_t sample_count, int64
reservoir_initialized(false) {
}

//void ReservoirSample::AddToReservoir(DataChunk &input, idx_t max_amount_to_add = STANDARD_VECTOR_SIZE) {
//
//}


void ReservoirSample::AddToReservoir(DataChunk &input) {
if (sample_count == 0) {
return;
Expand Down Expand Up @@ -43,6 +48,20 @@ void ReservoirSample::AddToReservoir(DataChunk &input) {
}
}

void BlockingSample::Merge(unique_ptr<BlockingSample> &other, idx_t samples_to_merge) {
auto num_samples_merged = 0;
while (num_samples_merged < samples_to_merge) {
auto chunk = other->GetChunk();
if (chunk->size() + num_samples_merged > samples_to_merge) {
chunk->SetCardinality(num_samples_merged - chunk->size());
AddToReservoir(*chunk);
break;
}
num_samples_merged += chunk->size();
AddToReservoir(*chunk);
}
}

unique_ptr<DataChunk> ReservoirSample::GetChunk() {
if (num_added_samples == 0) {
return nullptr;
Expand Down
14 changes: 14 additions & 0 deletions src/include/duckdb/execution/reservoir_sample.hpp
Expand Up @@ -15,6 +15,16 @@

namespace duckdb {

class BlockingSample;

struct intermediate_sample_and_pop_count {
intermediate_sample_and_pop_count(unique_ptr<BlockingSample> isample, idx_t pop_count) :
isample(move(isample)), pop_count(pop_count) {}
unique_ptr<BlockingSample> isample;
idx_t pop_count;
double weight = 0;
};

class BaseReservoirSampling {
public:
explicit BaseReservoirSampling(int64_t seed);
Expand Down Expand Up @@ -54,6 +64,10 @@ class BlockingSample {
//! Add a chunk of data to the sample
virtual void AddToReservoir(DataChunk &input) = 0;

//! When collecting samples in parallel, merge samples to create
//! the final sample for the column
void Merge(unique_ptr<BlockingSample> &other, idx_t samples_to_merge);

//! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the
// sample is completely built.
virtual unique_ptr<DataChunk> GetChunk() = 0;
Expand Down

0 comments on commit 8a01b32

Please sign in to comment.