Skip to content

Commit

Permalink
its getting better but still getting memory errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Tmonster committed Dec 19, 2022
1 parent 97a491e commit 605520f
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 59 deletions.
Expand Up @@ -61,6 +61,7 @@ void PhysicalReservoirSample::GetData(ExecutionContext &context, DataChunk &chun
if (!sample_chunk) {
return;
}

chunk.Move(*sample_chunk);
}

Expand Down
99 changes: 51 additions & 48 deletions src/execution/reservoir_sample.cpp
Expand Up @@ -4,7 +4,7 @@
namespace duckdb {

ReservoirSample::ReservoirSample(Allocator &allocator, idx_t sample_count, int64_t seed)
: BlockingSample(seed), sample_count(sample_count), reservoir(allocator) {
: BlockingSample(seed), allocator(allocator), num_added_samples(0), sample_count(sample_count), reservoir_initialized(false) {
}

void ReservoirSample::AddToReservoir(DataChunk &input) {
Expand Down Expand Up @@ -39,17 +39,45 @@ void ReservoirSample::AddToReservoir(DataChunk &input) {
}
}

void ReservoirSample::GetChunk() {




unique_ptr<DataChunk> ReservoirSample::GetChunk() {
//TODO: The calling functions need to be updated because maybe we don't want to delete everything?
return make_unique<DataChunk>(reservoir);
if (num_added_samples == 0) {
return nullptr;
}

if (reservoir_dchunk->size() > STANDARD_VECTOR_SIZE) {
// get from the back
auto ret = make_unique<DataChunk>();
auto samples_remaining = num_added_samples - STANDARD_VECTOR_SIZE;
auto reservoir_types = reservoir_dchunk->GetTypes();
SelectionVector sel(STANDARD_VECTOR_SIZE);
for (idx_t i = samples_remaining; i < num_added_samples; i++) {
sel.set_index(i - samples_remaining, i);
}
ret->Initialize(allocator, reservoir_types.begin(), reservoir_types.end(), STANDARD_VECTOR_SIZE);
reservoir_dchunk->Slice(*ret, sel, STANDARD_VECTOR_SIZE);
ret->SetCardinality(STANDARD_VECTOR_SIZE);
// reduce capacity and cardinality of the sample data chunk
reservoir_dchunk->SetCardinality(samples_remaining);
reservoir_dchunk->SetCapacity(samples_remaining);
num_added_samples = samples_remaining;
return ret;

}
num_added_samples = 0;
return move(reservoir_dchunk);
}

void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk) {
// replace the entry in the reservoir
// 8. The item in R with the minimum key is replaced by item vi
for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
// TODO: Make this vectorized in some way. IDK how yet.
reservoir_dchunk.SetValue(col_idx, base_reservoir_sample.min_weighted_entry, input.GetValue(col_idx, index_in_chunk));
reservoir_dchunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry, input.GetValue(col_idx, index_in_chunk));
}
base_reservoir_sample.ReplaceElement();
}
Expand All @@ -58,66 +86,41 @@ idx_t ReservoirSample::SamplesInReservoir() {
return num_added_samples;
}


void ReservoirSample::ReservoirMergeChunk(DataChunk &input) {
if (input.size() == 0) {
return;
}
input.Verify();
D_ASSERT(types.size() == input.ColumnCount());
auto new_types = input.GetTypes();
for (idx_t i = 0; i < types.size(); i++) {
if (new_types[i] != types[i]) {
throw TypeMismatchException(new_types[i], types[i], "Reservoir Sampler type mismatch when combining rows");
}
// TODO: handel this
if (types[i].InternalType() == PhysicalType::LIST) {
// need to check all the chunks because they can have only-null list entries
// for (auto &chunk : chunks) {
// auto &chunk_vec = chunk->data[i];
// auto &new_vec = new_chunk.data[i];
// auto &chunk_type = chunk_vec.GetType();
// auto &new_type = new_vec.GetType();
// if (chunk_type != new_type) {
// throw TypeMismatchException(chunk_type, new_type, "Type mismatch when combining lists");
// }
// }
}
// TODO check structs, too
}
for (idx_t i = 0; i < new_types.size(); i++) {
D_ASSERT(reservoir[i].GetVectorType() == VectorType::FLAT_VECTOR);
VectorOperations::Copy(input.data[i], reservoir[i], input.size(), num_added_samples, input.size());
}
num_added_samples += input.size();
D_ASSERT(num_added_samples <= sample_count);

void ReservoirSample::InitializeReservoir(DataChunk &input) {
reservoir_dchunk = make_unique<DataChunk>();
reservoir_dchunk->Initialize(allocator, input.GetTypes(), sample_count);
reservoir_initialized = true;
}

idx_t ReservoirSample::FillReservoir(DataChunk &input) {
idx_t chunk_count = input.size();
input.Flatten();
D_ASSERT(num_added_samples <= sample_count);

// we have not: append to the reservoir
idx_t required_count;
idx_t added_samples = SamplesInReservoir();
if (added_samples + chunk_count >= sample_count) {
if (num_added_samples + chunk_count >= sample_count) {
// have to limit the count of the chunk
required_count = sample_count - added_samples;
required_count = sample_count - num_added_samples;
} else {
// we copy the entire chunk
required_count = chunk_count;
}
// instead of copying we just change the pointer in the current chunk
input.SetCardinality(required_count);
// TODO: here we need to fix how data is appended
// ReservoirMergeChunk(input);
reservoir_dchunk.Append(input, false);

base_reservoir_sample.InitializeReservoir(SamplesInReservoir(), sample_count);
// initialize the reservoir
if (!reservoir_initialized) {
InitializeReservoir(input);
}
reservoir_dchunk->Append(input, false, nullptr, required_count);
base_reservoir_sample.InitializeReservoir(reservoir_dchunk->size(), sample_count);

num_added_samples += required_count;
reservoir_dchunk->SetCardinality(num_added_samples);


// check if there are still elements remaining
// this happens if we are on a boundary
// check if there are still elements remaining in the Input data chunk that should be
// randomly sampled and potentially added. This happens if we are on a boundary
// for example, input.size() is 1024, but our sample size is 10
if (required_count == chunk_count) {
// we are done here
Expand Down
6 changes: 3 additions & 3 deletions src/function/aggregate/holistic/reservoir_quantile.cpp
Expand Up @@ -29,7 +29,7 @@ struct ReservoirQuantileState {
}

void ReplaceElement(T &input) {
v[r_samp->min_entry] = input;
v[r_samp->min_weighted_entry] = input;
r_samp->ReplaceElement();
}

Expand All @@ -38,8 +38,8 @@ struct ReservoirQuantileState {
v[pos++] = element;
r_samp->InitializeReservoir(pos, len);
} else {
D_ASSERT(r_samp->next_index >= r_samp->current_count);
if (r_samp->next_index == r_samp->current_count) {
D_ASSERT(r_samp->next_index_to_sample >= r_samp->num_seen_entries);
if (r_samp->next_index_to_sample == r_samp->num_seen_entries) {
ReplaceElement(element);
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/include/duckdb/common/types/data_chunk.hpp
Expand Up @@ -69,6 +69,9 @@ class DataChunk {
inline void SetCapacity(const DataChunk &other) {
SetCapacity(other.capacity);
}
inline idx_t GetCapacity() {
return capacity;
}

DUCKDB_API Value GetValue(idx_t col_idx, idx_t index) const;
DUCKDB_API void SetValue(idx_t col_idx, idx_t index, const Value &val);
Expand Down Expand Up @@ -112,7 +115,7 @@ class DataChunk {
DUCKDB_API void Copy(DataChunk &other, const SelectionVector &sel, const idx_t source_count,
const idx_t offset = 0) const;

//! Splits the DataChunk in two
//! Splits the DataChunk in two column wise
DUCKDB_API void Split(DataChunk &other, idx_t split_idx);

//! Fuses a DataChunk onto the right of this one, and destroys the other. Inverse of Split.
Expand Down
Expand Up @@ -13,7 +13,9 @@

namespace duckdb {

//! PhysicalReservoirSample represents a sample taken using reservoir sampling, which is a blocking sampling method
//! PhysicalReservoirSample represents a sample taken using reservoir sampling,
//! which is a blocking sampling method

class PhysicalReservoirSample : public PhysicalOperator {
public:
PhysicalReservoirSample(vector<LogicalType> types, unique_ptr<SampleOptions> options, idx_t estimated_cardinality)
Expand Down
12 changes: 6 additions & 6 deletions src/include/duckdb/execution/reservoir_sample.hpp
Expand Up @@ -53,7 +53,7 @@ class BlockingSample {

//! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the
// sample is completely built.
virtual void GetChunk() = 0;
virtual unique_ptr<DataChunk> GetChunk() = 0;

protected:
//! The reservoir sampling
Expand All @@ -71,24 +71,24 @@ class ReservoirSample : public BlockingSample {

//! Fetches a chunk from the sample. Note that this method is destructive and should only be used after the
//! sample is completely built.
void GetChunk() override;
unique_ptr<DataChunk> GetChunk() override;

private:
//! Replace a single element of the input
void ReplaceElement(DataChunk &input, idx_t index_in_chunk);
void InitializeReservoir(DataChunk &input);
idx_t SamplesInReservoir();
void ReservoirMergeChunk(DataChunk &input);
//! Fills the reservoir up until sample_count entries, returns how many entries are still required
idx_t FillReservoir(DataChunk &input);

private:
Allocator &allocator;
idx_t num_added_samples;
//! The size of the reservoir sample
idx_t sample_count;
vector<LogicalType> types;
bool reservoir_initialized;
//! The current reservoir
vector<Vector> reservoir;
DataChunk reservoir_dchunk;
unique_ptr<DataChunk> reservoir_dchunk ;
};

//! The reservoir sample sample_size class maintains a streaming sample of variable size
Expand Down
20 changes: 20 additions & 0 deletions test/sql/sample/reservoir_testing.test
@@ -0,0 +1,20 @@
# name: test/sql/sample/test_sample.test
# description: Test SAMPLE keyword
# group: [sample]

require tpch

statement ok
CALL dbgen(sf=0.1);

statement ok
PRAGMA enable_verification;

statement ok
PRAGMA threads=1;

query I
SELECT COUNT(*) FROM lineitem USING SAMPLE 4096 ROWS
----
4096

0 comments on commit 605520f

Please sign in to comment.