Skip to content

Commit

Permalink
compiles. Now I want to figure out where I left off last time
Browse files Browse the repository at this point in the history
  • Loading branch information
Tmonster committed Nov 29, 2023
1 parent 450655c commit 43e72a4
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 17 deletions.
18 changes: 10 additions & 8 deletions src/execution/operator/helper/physical_reservoir_sample.cpp
Expand Up @@ -101,23 +101,25 @@ SinkResultType PhysicalReservoirSample::Sink(ExecutionContext &context, DataChun
return SinkResultType::NEED_MORE_INPUT;
}

void PhysicalReservoirSample::Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const {
auto &global_state = (SampleGlobalSinkState &)gstate;
auto &local_state = (SampleLocalSinkState &)lstate;
SinkCombineResultType PhysicalReservoirSample::Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const {
auto &global_state = input.global_state.Cast<SampleGlobalSinkState>();
auto &local_state = input.local_state.Cast<SampleLocalSinkState>();
lock_guard<mutex> glock(global_state.lock);
global_state.intermediate_samples.push_back(move(local_state.sample));
global_state.intermediate_samples.push_back(std::move(local_state.sample));
return SinkCombineResultType::BLOCKED;
}

SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &event, ClientContext &context, GlobalSinkState &gstate) const {
auto &global_state = (SampleGlobalSinkState &)gstate;
SinkFinalizeType PhysicalReservoirSample::Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
OperatorSinkFinalizeInput &input) const {
auto &global_state = input.global_state.Cast<SampleGlobalSinkState>();
D_ASSERT(global_state.intermediate_samples.size() >= 1);
auto last_sample = move(global_state.intermediate_samples.back());
auto last_sample = std::move(global_state.intermediate_samples.back());
global_state.intermediate_samples.pop_back();
for (auto &sample : global_state.intermediate_samples) {
last_sample->Merge(sample);
}
last_sample->Finalize();
global_state.sample = move(last_sample);
global_state.sample = std::move(last_sample);
global_state.intermediate_samples.clear();
return SinkFinalizeType::READY;
}
Expand Down
10 changes: 5 additions & 5 deletions src/execution/reservoir_sample.cpp
Expand Up @@ -91,7 +91,7 @@ unique_ptr<DataChunk> ReservoirSample::GetChunk() {
D_ASSERT(reservoir_chunk->GetCapacity() == sample_count);
if (reservoir_chunk->size() > STANDARD_VECTOR_SIZE) {
// get from the back
auto ret = make_unique<DataChunk>();
auto ret = make_uniq<DataChunk>();
auto samples_remaining = num_added_samples - STANDARD_VECTOR_SIZE;
auto reservoir_types = reservoir_chunk->GetTypes();
SelectionVector sel(STANDARD_VECTOR_SIZE);
Expand All @@ -107,7 +107,7 @@ unique_ptr<DataChunk> ReservoirSample::GetChunk() {
return ret;
}
num_added_samples = 0;
return move(reservoir_chunk);
return std::move(reservoir_chunk);
}

void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk, double with_weight) {
Expand All @@ -128,7 +128,7 @@ void ReservoirSample::Finalize() {
}

void ReservoirSample::InitializeReservoir(DataChunk &input) {
reservoir_chunk = make_unique<DataChunk>();
reservoir_chunk = make_uniq<DataChunk>();
reservoir_chunk->Initialize(allocator, input.GetTypes(), sample_count);
for (idx_t col_idx = 0; col_idx < reservoir_chunk->ColumnCount(); col_idx++) {
FlatVector::Validity(reservoir_chunk->data[col_idx]).Initialize(sample_count);
Expand Down Expand Up @@ -289,9 +289,9 @@ void ReservoirSamplePercentage::Finalize() {
}
new_sample->AddToReservoir(*chunk);
}
finished_samples.push_back(move(new_sample));
finished_samples.push_back(std::move(new_sample));
} else {
finished_samples.push_back(move(current_sample));
finished_samples.push_back(std::move(current_sample));
}
is_finalized = true;
}
Expand Down
Expand Up @@ -38,9 +38,9 @@ class PhysicalReservoirSample : public PhysicalOperator {
SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const override;
unique_ptr<GlobalSinkState> GetGlobalSinkState(ClientContext &context) const override;
unique_ptr<LocalSinkState> GetLocalSinkState(ExecutionContext &context) const override;
void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const override;
SinkCombineResultType Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const override;
SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
GlobalSinkState &gstate) const override;
OperatorSinkFinalizeInput &input) const override;
bool ParallelSink() const override {
return true;
}
Expand Down
3 changes: 1 addition & 2 deletions src/include/duckdb/execution/physical_operator.hpp
Expand Up @@ -135,8 +135,7 @@ class PhysicalOperator {
//! The sink method is called constantly with new input, as long as new input is available. Note that this method
//! CAN be called in parallel, proper locking is needed when accessing dat
//!a inside the GlobalSinkState.
virtual SinkResultType Sink(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate,
DataChunk &input) const;
virtual SinkResultType Sink(ExecutionContext &context, DataChunk &chunk, OperatorSinkInput &input) const;
// The combine is called when a single thread has completed execution of its part of the pipeline, it is the final
// time that a specific LocalSinkState is accessible. This method can be called in parallel while other Sink() or
// Combine() calls are active on the same GlobalSinkState.
Expand Down

0 comments on commit 43e72a4

Please sign in to comment.