Skip to content

Commit

Permalink
small syntax updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Tmonster committed Jan 4, 2023
1 parent fa2ac9c commit 750c1e3
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 40 deletions.
59 changes: 22 additions & 37 deletions src/execution/reservoir_sample.cpp
Expand Up @@ -22,7 +22,7 @@ void ReservoirSample::AddToReservoir(DataChunk &input) {
return;
}
}
D_ASSERT(reservoir_dchunk->GetCapacity() == sample_count);
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
// find the position of next_index_to_sample relative to number of seen entries (num_seen_entries)
idx_t remaining = input.size();
idx_t base_offset = 0;
Expand All @@ -33,7 +33,7 @@ void ReservoirSample::AddToReservoir(DataChunk &input) {
base_reservoir_sample.num_seen_entries += remaining;
return;
}
D_ASSERT(reservoir_dchunk->GetCapacity() == sample_count);
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
// in this chunk! replace the element
ReplaceElement(input, base_offset + offset);
// shift the chunk forward
Expand All @@ -47,47 +47,47 @@ unique_ptr<DataChunk> ReservoirSample::GetChunk() {
if (num_added_samples == 0) {
return nullptr;
}
D_ASSERT(reservoir_dchunk->GetCapacity() == sample_count);
if (reservoir_dchunk->size() > STANDARD_VECTOR_SIZE) {
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
if (reservoir_chunk->size() > STANDARD_VECTOR_SIZE) {
// get from the back
auto ret = make_unique<DataChunk>();
auto samples_remaining = num_added_samples - STANDARD_VECTOR_SIZE;
auto reservoir_types = reservoir_dchunk->GetTypes();
auto reservoir_types = reservoir_chunk->GetTypes();
SelectionVector sel(STANDARD_VECTOR_SIZE);
for (idx_t i = samples_remaining; i < num_added_samples; i++) {
sel.set_index(i - samples_remaining, i);
}
ret->Initialize(allocator, reservoir_types.begin(), reservoir_types.end(), STANDARD_VECTOR_SIZE);
reservoir_dchunk->Slice(*ret, sel, STANDARD_VECTOR_SIZE);
reservoir_chunk->Slice(*ret, sel, STANDARD_VECTOR_SIZE);
ret->SetCardinality(STANDARD_VECTOR_SIZE);
// reduce capacity and cardinality of the sample data chunk
reservoir_dchunk->SetCardinality(samples_remaining);
// reservoir_dchunk->SetCapacity(samples_remaining);
reservoir_chunk->SetCardinality(samples_remaining);
// reservoir_chunk->SetCapacity(samples_remaining);
num_added_samples = samples_remaining;
return ret;
}
num_added_samples = 0;
return move(reservoir_dchunk);
return move(reservoir_chunk);
}

void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk) {
// replace the entry in the reservoir
// 8. The item in R with the minimum key is replaced by item
D_ASSERT(input.ColumnCount() == reservoir_dchunk->ColumnCount());
D_ASSERT(reservoir_dchunk->GetCapacity() == sample_count);
D_ASSERT(input.ColumnCount() == reservoir_chunk->ColumnCount());
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
D_ASSERT(reservoir_dchunk->GetCapacity() == sample_count);
reservoir_dchunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry,
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
reservoir_chunk->SetValue(col_idx, base_reservoir_sample.min_weighted_entry,
input.GetValue(col_idx, index_in_chunk));
}
base_reservoir_sample.ReplaceElement();
}

void ReservoirSample::InitializeReservoir(DataChunk &input) {
reservoir_dchunk = make_unique<DataChunk>();
reservoir_dchunk->Initialize(allocator, input.GetTypes(), sample_count);
for (idx_t col_idx = 0; col_idx < reservoir_dchunk->ColumnCount(); col_idx++) {
FlatVector::Validity(reservoir_dchunk->data[col_idx]).Initialize(sample_count);
reservoir_chunk = make_unique<DataChunk>();
reservoir_chunk->Initialize(allocator, input.GetTypes(), sample_count);
for (idx_t col_idx = 0; col_idx < reservoir_chunk->ColumnCount(); col_idx++) {
FlatVector::Validity(reservoir_chunk->data[col_idx]).Initialize(sample_count);
}
reservoir_initialized = true;
}
Expand All @@ -112,12 +112,12 @@ idx_t ReservoirSample::FillReservoir(DataChunk &input) {
if (!reservoir_initialized) {
InitializeReservoir(input);
}
reservoir_dchunk->Append(input, false, nullptr, required_count);
base_reservoir_sample.InitializeReservoir(reservoir_dchunk->size(), sample_count);
reservoir_chunk->Append(input, false, nullptr, required_count);
base_reservoir_sample.InitializeReservoir(reservoir_chunk->size(), sample_count);

num_added_samples += required_count;
D_ASSERT(reservoir_dchunk->GetCapacity() == sample_count);
reservoir_dchunk->SetCardinality(num_added_samples);
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
reservoir_chunk->SetCardinality(num_added_samples);

// check if there are still elements remaining in the Input data chunk that should be
// randomly sampled and potentially added. This happens if we are on a boundary
Expand All @@ -134,7 +134,7 @@ idx_t ReservoirSample::FillReservoir(DataChunk &input) {
}
// slice the input vector and continue
input.Slice(sel, chunk_count - required_count);
D_ASSERT(reservoir_dchunk->GetCapacity() == sample_count);
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
return input.size();
}

Expand Down Expand Up @@ -198,21 +198,6 @@ unique_ptr<DataChunk> ReservoirSamplePercentage::GetChunk() {
return nullptr;
}

// if (current_count > 0) {
// // create a new sample
// auto new_sample_size = idx_t(round(sample_percentage * current_count));
// auto new_sample = make_unique<ReservoirSample>(allocator, new_sample_size, random.NextRandomInteger());
// while (true) {
// auto chunk = current_sample->GetChunk();
// if (!chunk || chunk->size() == 0) {
// break;
// }
// new_sample->AddToReservoir(*chunk);
// }
// finished_samples.push_back(move(new_sample));
// }
// is_finalized = true;

void ReservoirSamplePercentage::Finalize() {
// need to finalize the current sample, if any
// we are finializing, so we are starting to return chunks. Our last chunk has
Expand Down
2 changes: 1 addition & 1 deletion src/include/duckdb/common/types/data_chunk.hpp
Expand Up @@ -115,7 +115,7 @@ class DataChunk {
DUCKDB_API void Copy(DataChunk &other, const SelectionVector &sel, const idx_t source_count,
const idx_t offset = 0) const;

//! Splits the DataChunk in two column wise
//! Splits the DataChunk in two
DUCKDB_API void Split(DataChunk &other, idx_t split_idx);

//! Fuses a DataChunk onto the right of this one, and destroys the other. Inverse of Split.
Expand Down
4 changes: 2 additions & 2 deletions src/include/duckdb/execution/reservoir_sample.hpp
Expand Up @@ -87,7 +87,7 @@ class ReservoirSample : public BlockingSample {
idx_t sample_count;
bool reservoir_initialized;
//! The current reservoir
unique_ptr<DataChunk> reservoir_dchunk;
unique_ptr<DataChunk> reservoir_chunk;
};

//! The reservoir sample sample_size class maintains a streaming sample of variable size
Expand Down Expand Up @@ -117,7 +117,7 @@ class ReservoirSamplePercentage : public BlockingSample {
unique_ptr<ReservoirSample> current_sample;
//! The set of finished samples of the reservoir sample
vector<unique_ptr<ReservoirSample>> finished_samples;
//! The amount of tuples that have been processed so far (I think this means seen).
//! The amount of tuples that have been processed so far (not put in the reservoir, just processed)
idx_t current_count = 0;
//! Whether or not the stream is finalized. The stream is automatically finalized on the first call to GetChunk();
bool is_finalized;
Expand Down

0 comments on commit 750c1e3

Please sign in to comment.