Skip to content

Commit

Permalink
some debugging statements
Browse files Browse the repository at this point in the history
  • Loading branch information
Tmonster committed Dec 5, 2023
1 parent 4ad877c commit d378cc7
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 57 deletions.
17 changes: 15 additions & 2 deletions src/execution/operator/helper/physical_reservoir_sample.cpp
Expand Up @@ -120,6 +120,18 @@ static void PrintSampleCount(vector<unique_ptr<BlockingSample>> &samples) {
std::cout << "samples in reservoir is " << total_count << std::endl;
}

static void PrintAllSeenSamples(ReservoirSamplePercentage &sample_percentage) {
idx_t finished_samples_seen = 0;
for (auto &sample : sample_percentage.finished_samples) {
// auto &tmp = sample->Cast<ReservoirSamplePercentage>();
finished_samples_seen += sample->base_reservoir_sample.num_entries_seen_total;
D_ASSERT(sample->base_reservoir_sample.num_entries_seen_total == 100000);
}
finished_samples_seen += sample_percentage.current_count;
std::cout << "this sample has seen " << finished_samples_seen << " tuples" << std::endl;
}


SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
OperatorSinkFinalizeInput &input) const {
auto &global_state = input.global_state.Cast<SampleGlobalSinkState>();
Expand All @@ -143,11 +155,12 @@ SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &ev
auto ls = std::move(global_state.intermediate_samples.back());
auto &percentage_last_sample = ls->Cast<ReservoirSamplePercentage>();
global_state.intermediate_samples.pop_back();

PrintAllSeenSamples(percentage_last_sample);
// merge to the rest .
for (auto &sample : global_state.intermediate_samples) {
// combine the unfinished samples
auto &intermediate_percentage_sample = sample->Cast<ReservoirSamplePercentage>();
PrintAllSeenSamples(intermediate_percentage_sample);

while (!intermediate_percentage_sample.finished_samples.empty()) {
percentage_last_sample.finished_samples.push_back(std::move(intermediate_percentage_sample.finished_samples.get(0)));
Expand All @@ -158,7 +171,7 @@ SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &ev
percentage_last_sample.Finalize();
global_state.sample = std::move(ls);
global_state.intermediate_samples.clear();
} else if (sampling_type == ReservoirSamplingType::RESERVOIR_SAMPLE){
} else if (sampling_type == ReservoirSamplingType::RESERVOIR_SAMPLE) {
auto largest_sample_index = 0;
auto cur_largest_sample = global_state.intermediate_samples.at(largest_sample_index)->get_sample_count();
for (idx_t i = 0; i < global_state.intermediate_samples.size(); i++) {
Expand Down
12 changes: 9 additions & 3 deletions src/execution/reservoir_sample.cpp
Expand Up @@ -49,6 +49,12 @@ void ReservoirSample::AddToReservoir(DataChunk &input) {
void ReservoirSample::MergeUnfinishedSamples(unique_ptr<BlockingSample> &other) {
auto &reservoir_sample = other->Cast<ReservoirSample>();
reservoir_sample.Finalize();
throw InternalException("a normal reservoir sample should not merge unfisihed samples");
// auto chunk = reservoir_sample.GetChunk();
// while (chunk) {
// AddToReservoir(*chunk);
// chunk = other->GetChunk();
// }
}


Expand Down Expand Up @@ -254,9 +260,9 @@ ReservoirSamplePercentage::ReservoirSamplePercentage(Allocator &allocator, doubl

void ReservoirSamplePercentage::MergeUnfinishedSamples(unique_ptr<BlockingSample> &other) {
auto &percentage_sample = other->Cast<ReservoirSamplePercentage>();
auto actual_seen = percentage_sample.current_count;
auto in_sample = percentage_sample.current_sample->num_added_samples;
auto difference = actual_seen - in_sample;
auto other_seen = percentage_sample.current_count;
auto other_added = percentage_sample.current_sample->num_added_samples;
auto difference = other_seen - other_added;
auto chunk = other->GetChunk();
while (chunk) {
AddToReservoir(*chunk);
Expand Down
106 changes: 54 additions & 52 deletions test/sql/sample/reservoir_testing_percentage.test
Expand Up @@ -2,49 +2,49 @@
# description: Test SAMPLE keyword
# group: [sample]

loop i 1 8
#loop i 1 8

statement ok
pragma threads=${i};
pragma threads=3;

statement ok
CREATE or replace TABLE t1 as select range a from range(1000);

query I
SELECT count(*) from t1 using sample 0 percent (reservoir);
----
0

query I
SELECT count(*) from t1 using sample 10 percent (reservoir);
----
100

query I
SELECT count(*) from t1 using sample 20 percent (reservoir);
----
200

query I
SELECT count(*) from t1 using sample 80 percent (reservoir);
----
800


query I
SELECT count(*) from t1 using sample 100 percent (reservoir);
----
1000
#
#query I
#SELECT count(*) from t1 using sample 0 percent (reservoir);
#----
#0
#
#query I
#SELECT count(*) from t1 using sample 10 percent (reservoir);
#----
#100
#
#query I
#SELECT count(*) from t1 using sample 20 percent (reservoir);
#----
#200
#
#query I
#SELECT count(*) from t1 using sample 80 percent (reservoir);
#----
#800
#
#
#query I
#SELECT count(*) from t1 using sample 100 percent (reservoir);
#----
#1000


statement ok
Insert into t1 select range a from range(9000);


query I
select count(*) from t1 using sample 80 percent (reservoir);
----
8000
#query I
#select count(*) from t1 using sample 80 percent (reservoir);
#----
#8000

statement ok
Insert into t1 select range a from range(90000);
Expand All @@ -53,16 +53,17 @@ Insert into t1 select range a from range(90000);
statement ok
Insert into t1 select range a from range(900000);

#
#query I
#select count(*) from t1 using sample 20 percent (reservoir);
#----
#200000
#

query I
select count(*) from t1 using sample 20 percent (reservoir);
----
200000

query I
select count(*) from t1 using sample 30 percent (reservoir);
----
300000
#query I
#select count(*) from t1 using sample 30 percent (reservoir);
#----
#300000

query I
select count(*) from t1 using sample 40 percent (reservoir);
Expand All @@ -74,14 +75,15 @@ select count(*) from t1 using sample 50 percent (reservoir);
----
500000

query I
select count(*) from t1 using sample 60 percent (reservoir);
----
600000

query I
select count(*) from t1 using sample 70 percent (reservoir);
----
700000

endloop
#
#query I
#select count(*) from t1 using sample 60 percent (reservoir);
#----
#600000
#
#query I
#select count(*) from t1 using sample 70 percent (reservoir);
#----
#700000

#endloop

0 comments on commit d378cc7

Please sign in to comment.