Skip to content

Commit

Permalink
reservoir sample works. but for large cardinalities and high percenta…
Browse files Browse the repository at this point in the history
…ges no
  • Loading branch information
Tmonster committed Dec 4, 2023
1 parent ddcea54 commit 04d4c0d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 15 deletions.
11 changes: 3 additions & 8 deletions src/execution/operator/helper/physical_reservoir_sample.cpp
Expand Up @@ -95,21 +95,16 @@ SinkResultType PhysicalReservoirSample::Sink(ExecutionContext &context, DataChun
D_ASSERT(local_state.sample);
// we implement reservoir sampling without replacement and exponential jumps here
// the algorithm is adopted from the paper Weighted random sampling with a reservoir by Pavlos S. Efraimidis et al.
// note that the original algorithm is about weighted sampling; this is a simplified approach for uniform sampling
lock_guard<mutex> glock(global_state.lock);
global_state.sample->AddToReservoir(chunk);
// note that the original algorithm is about weighted sampling; this is a simplified approach for uniform sampling;
local_state.sample->AddToReservoir(chunk);
return SinkResultType::NEED_MORE_INPUT;
}

SinkCombineResultType PhysicalReservoirSample::Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const {
auto &global_state = input.global_state.Cast<SampleGlobalSinkState>();
auto &local_state = input.local_state.Cast<SampleLocalSinkState>();
lock_guard<mutex> glock(global_state.lock);
// for some reason here local_state.sample doesn't have the info anymore.
if (local_state.sample) {
// check if the local state sample even did sampling.
}
global_state.intermediate_samples.push_back(std::move(global_state.sample));
global_state.intermediate_samples.push_back(std::move(local_state.sample));
return SinkCombineResultType::FINISHED;
}

Expand Down
22 changes: 18 additions & 4 deletions src/execution/reservoir_sample.cpp
Expand Up @@ -105,8 +105,23 @@ unique_ptr<DataChunk> ReservoirSample::GetChunk() {
num_added_samples = samples_remaining;
return ret;
}
// TODO: Why do I need to put another selection vector over this one?
auto ret = make_uniq<DataChunk>();
auto samples_remaining = 0;
auto reservoir_types = reservoir_chunk->GetTypes();
SelectionVector sel(num_added_samples);
for (idx_t i = 0; i < num_added_samples; i++) {
sel.set_index(i, i);
}
ret->Initialize(allocator, reservoir_types.begin(), reservoir_types.end(), num_added_samples);
reservoir_chunk->Slice(*ret, sel, num_added_samples);
ret->SetCardinality(num_added_samples);
// reduce capacity and cardinality of the sample data chunk
reservoir_chunk->SetCardinality(samples_remaining);
num_added_samples = 0;
return std::move(reservoir_chunk);
return ret;
// num_added_samples = 0;
// return std::move(reservoir_chunk);
}

void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk, double with_weight) {
Expand Down Expand Up @@ -193,9 +208,6 @@ void ReservoirSamplePercentage::Merge(unique_ptr<BlockingSample> &other) {
//! We are now merging all the samples. 80% of every sample should equal 80%
//! of all rows so we set sample percentage to 1, which will means every tuple
//! in the added chunks will be added
if (!is_finalized) {
Finalize();
}
sample_percentage = 1;
auto chunk = other->GetChunk();
while (chunk) {
Expand Down Expand Up @@ -292,6 +304,8 @@ void ReservoirSamplePercentage::Finalize() {
} else {
finished_samples.push_back(std::move(current_sample));
}
// when finalizing, current_sample is null. All samples are now in finished samples.
current_sample = nullptr;
is_finalized = true;
}

Expand Down
27 changes: 24 additions & 3 deletions test/sql/sample/reservoir_testing.test
Expand Up @@ -3,16 +3,37 @@
# group: [sample]

statement ok
CREATE TABLE t1 as select (random() * 1000)::int a from range(100);
CREATE TABLE t1 as select (random() * 1000)::int a from range(1000);

#mode output_result
query I
SELECT count(*) from t1 using sample 80 percent (reservoir);
----
800

statement ok
Insert into t1 select range a from range(9000);

query I
SELECT count(*) from t1 using sample 80 percent (reservoir);
select count(*) from t1 using sample 80 percent (reservoir);
----
8000

statement ok
Insert into t1 select range a from range(90000);

query I
select count(*) from t1 using sample 80 percent (reservoir);
----
80000

statement ok
Insert into t1 select range a from range(900000);

query I
select count(*) from t1 using sample 80 percent (reservoir);
----
800000

mode skip

query I
Expand Down

0 comments on commit 04d4c0d

Please sign in to comment.