From 605520fb5973fbb40edbbddb3fa4fc666acebc06 Mon Sep 17 00:00:00 2001 From: Tom Ebergen Date: Mon, 19 Dec 2022 21:15:38 +0100 Subject: [PATCH] its getting better but still getting memory errors --- .../helper/physical_reservoir_sample.cpp | 1 + src/execution/reservoir_sample.cpp | 99 ++++++++++--------- .../aggregate/holistic/reservoir_quantile.cpp | 6 +- .../duckdb/common/types/data_chunk.hpp | 5 +- .../helper/physical_reservoir_sample.hpp | 4 +- .../duckdb/execution/reservoir_sample.hpp | 12 +-- test/sql/sample/reservoir_testing.test | 20 ++++ 7 files changed, 88 insertions(+), 59 deletions(-) create mode 100644 test/sql/sample/reservoir_testing.test diff --git a/src/execution/operator/helper/physical_reservoir_sample.cpp b/src/execution/operator/helper/physical_reservoir_sample.cpp index fbe35a8e8cd..213296f2d27 100644 --- a/src/execution/operator/helper/physical_reservoir_sample.cpp +++ b/src/execution/operator/helper/physical_reservoir_sample.cpp @@ -61,6 +61,7 @@ void PhysicalReservoirSample::GetData(ExecutionContext &context, DataChunk &chun if (!sample_chunk) { return; } + chunk.Move(*sample_chunk); } diff --git a/src/execution/reservoir_sample.cpp b/src/execution/reservoir_sample.cpp index 0c0c6e0d611..b8e2042b398 100644 --- a/src/execution/reservoir_sample.cpp +++ b/src/execution/reservoir_sample.cpp @@ -4,7 +4,7 @@ namespace duckdb { ReservoirSample::ReservoirSample(Allocator &allocator, idx_t sample_count, int64_t seed) - : BlockingSample(seed), sample_count(sample_count), reservoir(allocator) { + : BlockingSample(seed), allocator(allocator), num_added_samples(0), sample_count(sample_count), reservoir_initialized(false) { } void ReservoirSample::AddToReservoir(DataChunk &input) { @@ -39,9 +39,37 @@ void ReservoirSample::AddToReservoir(DataChunk &input) { } } -void ReservoirSample::GetChunk() { + + + + +unique_ptr ReservoirSample::GetChunk() { //TODO: The calling functions need to be updated because maybe we don't want to delete everything? - return make_unique(reservoir); + if (num_added_samples == 0) { + return nullptr; + } + + if (reservoir_dchunk->size() > STANDARD_VECTOR_SIZE) { + // get from the back + auto ret = make_unique(); + auto samples_remaining = num_added_samples - STANDARD_VECTOR_SIZE; + auto reservoir_types = reservoir_dchunk->GetTypes(); + SelectionVector sel(STANDARD_VECTOR_SIZE); + for (idx_t i = samples_remaining; i < num_added_samples; i++) { + sel.set_index(i - samples_remaining, i); + } + ret->Initialize(allocator, reservoir_types.begin(), reservoir_types.end(), STANDARD_VECTOR_SIZE); + reservoir_dchunk->Slice(*ret, sel, STANDARD_VECTOR_SIZE); + ret->SetCardinality(STANDARD_VECTOR_SIZE); + // reduce capacity and cardinality of the sample data chunk + reservoir_dchunk->SetCardinality(samples_remaining); + reservoir_dchunk->SetCapacity(samples_remaining); + num_added_samples = samples_remaining; + return ret; + + } + num_added_samples = 0; + return move(reservoir_dchunk); } void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk) { @@ -49,7 +77,7 @@ void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk) { // 8. The item in R with the minimum key is replaced by item vi for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) { // TODO: Make this vectorized in some way. IDK how yet. - reservoir_dchunk.SetValue(col_idx, base_reservoir_sample.min_weighted_entry, input.GetValue(col_idx, index_in_chunk)); + reservoir_dchunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry, input.GetValue(col_idx, index_in_chunk)); } base_reservoir_sample.ReplaceElement(); } @@ -58,66 +86,41 @@ idx_t ReservoirSample::SamplesInReservoir() { return num_added_samples; } - -void ReservoirSample::ReservoirMergeChunk(DataChunk &input) { - if (input.size() == 0) { - return; - } - input.Verify(); - D_ASSERT(types.size() == input.ColumnCount()); - auto new_types = input.GetTypes(); - for (idx_t i = 0; i < types.size(); i++) { - if (new_types[i] != types[i]) { - throw TypeMismatchException(new_types[i], types[i], "Reservoir Sampler type mismatch when combining rows"); - } - // TODO: handel this - if (types[i].InternalType() == PhysicalType::LIST) { - // need to check all the chunks because they can have only-null list entries -// for (auto &chunk : chunks) { -// auto &chunk_vec = chunk->data[i]; -// auto &new_vec = new_chunk.data[i]; -// auto &chunk_type = chunk_vec.GetType(); -// auto &new_type = new_vec.GetType(); -// if (chunk_type != new_type) { -// throw TypeMismatchException(chunk_type, new_type, "Type mismatch when combining lists"); -// } -// } - } - // TODO check structs, too - } - for (idx_t i = 0; i < new_types.size(); i++) { - D_ASSERT(reservoir[i].GetVectorType() == VectorType::FLAT_VECTOR); - VectorOperations::Copy(input.data[i], reservoir[i], input.size(), num_added_samples, input.size()); - } - num_added_samples += input.size(); - D_ASSERT(num_added_samples <= sample_count); - +void ReservoirSample::InitializeReservoir(DataChunk &input) { + reservoir_dchunk = make_unique(); + reservoir_dchunk->Initialize(allocator, input.GetTypes(), sample_count); + reservoir_initialized = true; } idx_t ReservoirSample::FillReservoir(DataChunk &input) { idx_t chunk_count = input.size(); input.Flatten(); + D_ASSERT(num_added_samples <= sample_count); // we have not: append to the reservoir idx_t required_count; - idx_t added_samples = SamplesInReservoir(); - if (added_samples + chunk_count >= sample_count) { + if (num_added_samples + chunk_count >= sample_count) { // have to limit the count of the chunk - required_count = sample_count - added_samples; + required_count = sample_count - num_added_samples; } else { // we copy the entire chunk required_count = chunk_count; } - // instead of copying we just change the pointer in the current chunk input.SetCardinality(required_count); - // TODO: here we need to fix how data is appended -// ReservoirMergeChunk(input); - reservoir_dchunk.Append(input, false); - base_reservoir_sample.InitializeReservoir(SamplesInReservoir(), sample_count); + // initialize the reservoir + if (!reservoir_initialized) { + InitializeReservoir(input); + } + reservoir_dchunk->Append(input, false, nullptr, required_count); + base_reservoir_sample.InitializeReservoir(reservoir_dchunk->size(), sample_count); + + num_added_samples += required_count; + reservoir_dchunk->SetCardinality(num_added_samples); + - // check if there are still elements remaining - // this happens if we are on a boundary + // check if there are still elements remaining in the Input data chunk that should be + // randomly sampled and potentially added. This happens if we are on a boundary // for example, input.size() is 1024, but our sample size is 10 if (required_count == chunk_count) { // we are done here diff --git a/src/function/aggregate/holistic/reservoir_quantile.cpp b/src/function/aggregate/holistic/reservoir_quantile.cpp index 3c0dc86545f..7a91a8dc134 100644 --- a/src/function/aggregate/holistic/reservoir_quantile.cpp +++ b/src/function/aggregate/holistic/reservoir_quantile.cpp @@ -29,7 +29,7 @@ struct ReservoirQuantileState { } void ReplaceElement(T &input) { - v[r_samp->min_entry] = input; + v[r_samp->min_weighted_entry] = input; r_samp->ReplaceElement(); } @@ -38,8 +38,8 @@ struct ReservoirQuantileState { v[pos++] = element; r_samp->InitializeReservoir(pos, len); } else { - D_ASSERT(r_samp->next_index >= r_samp->current_count); - if (r_samp->next_index == r_samp->current_count) { + D_ASSERT(r_samp->next_index_to_sample >= r_samp->num_seen_entries); + if (r_samp->next_index_to_sample == r_samp->num_seen_entries) { ReplaceElement(element); } } diff --git a/src/include/duckdb/common/types/data_chunk.hpp b/src/include/duckdb/common/types/data_chunk.hpp index b387a73a29e..004bc028c57 100644 --- a/src/include/duckdb/common/types/data_chunk.hpp +++ b/src/include/duckdb/common/types/data_chunk.hpp @@ -69,6 +69,9 @@ class DataChunk { inline void SetCapacity(const DataChunk &other) { SetCapacity(other.capacity); } + inline idx_t GetCapacity() { + return capacity; + } DUCKDB_API Value GetValue(idx_t col_idx, idx_t index) const; DUCKDB_API void SetValue(idx_t col_idx, idx_t index, const Value &val); @@ -112,7 +115,7 @@ class DataChunk { DUCKDB_API void Copy(DataChunk &other, const SelectionVector &sel, const idx_t source_count, const idx_t offset = 0) const; - //! Splits the DataChunk in two + //! Splits the DataChunk in two column wise DUCKDB_API void Split(DataChunk &other, idx_t split_idx); //! Fuses a DataChunk onto the right of this one, and destroys the other. Inverse of Split. diff --git a/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp b/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp index 591c5c51d71..c7ceefabec1 100644 --- a/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp +++ b/src/include/duckdb/execution/operator/helper/physical_reservoir_sample.hpp @@ -13,7 +13,9 @@ namespace duckdb { -//! PhysicalReservoirSample represents a sample taken using reservoir sampling, which is a blocking sampling method +//! PhysicalReservoirSample represents a sample taken using reservoir sampling, +//! which is a blocking sampling method + class PhysicalReservoirSample : public PhysicalOperator { public: PhysicalReservoirSample(vector types, unique_ptr options, idx_t estimated_cardinality) diff --git a/src/include/duckdb/execution/reservoir_sample.hpp b/src/include/duckdb/execution/reservoir_sample.hpp index e5d1e1489ad..5a48918e7f3 100644 --- a/src/include/duckdb/execution/reservoir_sample.hpp +++ b/src/include/duckdb/execution/reservoir_sample.hpp @@ -53,7 +53,7 @@ class BlockingSample { //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the // sample is completely built. - virtual void GetChunk() = 0; + virtual unique_ptr GetChunk() = 0; protected: //! The reservoir sampling @@ -71,24 +71,24 @@ class ReservoirSample : public BlockingSample { //! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the //! sample is completely built. - void GetChunk() override; + unique_ptr GetChunk() override; private: //! Replace a single element of the input void ReplaceElement(DataChunk &input, idx_t index_in_chunk); + void InitializeReservoir(DataChunk &input); idx_t SamplesInReservoir(); - void ReservoirMergeChunk(DataChunk &input); //! Fills the reservoir up until sample_count entries, returns how many entries are still required idx_t FillReservoir(DataChunk &input); private: + Allocator &allocator; idx_t num_added_samples; //! The size of the reservoir sample idx_t sample_count; - vector types; + bool reservoir_initialized; //! The current reservoir - vector reservoir; - DataChunk reservoir_dchunk; + unique_ptr reservoir_dchunk ; }; //! The reservoir sample sample_size class maintains a streaming sample of variable size diff --git a/test/sql/sample/reservoir_testing.test b/test/sql/sample/reservoir_testing.test new file mode 100644 index 00000000000..c512a7b588e --- /dev/null +++ b/test/sql/sample/reservoir_testing.test @@ -0,0 +1,20 @@ +# name: test/sql/sample/test_sample.test +# description: Test SAMPLE keyword +# group: [sample] + +require tpch + +statement ok +CALL dbgen(sf=0.1); + +statement ok +PRAGMA enable_verification; + +statement ok +PRAGMA threads=1; + +query I +SELECT COUNT(*) FROM lineitem USING SAMPLE 4096 ROWS +---- +4096 +