Skip to content

Commit

Permalink
Merge pull request #3733 from Mytherin/progressbarfixes
Browse files Browse the repository at this point in the history
Progress bar clean-up: fix thread sanitizer issue, and move progress bar code to individual operators
  • Loading branch information
Mytherin committed May 28, 2022
2 parents a25b6e3 + c5d0083 commit ac159c1
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 54 deletions.
6 changes: 3 additions & 3 deletions src/common/progress_bar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

namespace duckdb {

ProgressBar::ProgressBar(Executor &executor, idx_t show_progress_after)
: executor(executor), show_progress_after(show_progress_after), current_percentage(-1) {
ProgressBar::ProgressBar(Executor &executor, idx_t show_progress_after, bool print_progress)
: executor(executor), show_progress_after(show_progress_after), current_percentage(-1),
print_progress(print_progress) {
}

double ProgressBar::GetCurrentPercentage() {
Expand All @@ -28,7 +29,6 @@ void ProgressBar::Update(bool final) {
return;
}
auto sufficient_time_elapsed = profiler.Elapsed() > show_progress_after / 1000.0;
auto print_progress = ClientConfig::GetConfig(executor.context).print_progress_bar;
if (new_percentage > current_percentage) {
current_percentage = new_percentage;
}
Expand Down
8 changes: 8 additions & 0 deletions src/execution/operator/scan/physical_table_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ void PhysicalTableScan::GetData(ExecutionContext &context, DataChunk &chunk, Glo
}
}

double PhysicalTableScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const {
if (function.table_scan_progress) {
return function.table_scan_progress(context, bind_data.get());
}
// if table_scan_progress is not implemented we don't support this function yet in the progress bar
return -1;
}

idx_t PhysicalTableScan::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate_p,
LocalSourceState &lstate) const {
D_ASSERT(SupportsBatchIndex());
Expand Down
4 changes: 4 additions & 0 deletions src/execution/physical_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ idx_t PhysicalOperator::GetBatchIndex(ExecutionContext &context, DataChunk &chun
LocalSourceState &lstate) const {
throw InternalException("Calling GetBatchIndex on a node that does not support it");
}

double PhysicalOperator::GetProgress(ClientContext &context, GlobalSourceState &gstate) const {
return -1;
}
// LCOV_EXCL_STOP

//===--------------------------------------------------------------------===//
Expand Down
4 changes: 3 additions & 1 deletion src/include/duckdb/common/progress_bar.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace duckdb {

class ProgressBar {
public:
explicit ProgressBar(Executor &executor, idx_t show_progress_after);
explicit ProgressBar(Executor &executor, idx_t show_progress_after, bool print_progress);

//! Starts the thread
void Start();
Expand All @@ -39,6 +39,8 @@ class ProgressBar {
idx_t show_progress_after;
//! The current progress percentage
double current_percentage;
//! Whether or not we print the progress bar
bool print_progress;
//! Whether or not profiling is supported for the current query
bool supported = true;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class PhysicalTableScan : public PhysicalOperator {
bool SupportsBatchIndex() const override {
return function.supports_batch_index;
}

double GetProgress(ClientContext &context, GlobalSourceState &gstate) const override;
};

} // namespace duckdb
3 changes: 3 additions & 0 deletions src/include/duckdb/execution/physical_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ class PhysicalOperator {
return false;
}

//! Returns the current progress percentage, or a negative value if progress bars are not supported
virtual double GetProgress(ClientContext &context, GlobalSourceState &gstate) const;

public:
// Sink interface

Expand Down
3 changes: 1 addition & 2 deletions src/include/duckdb/parallel/pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
void Print() const;

//! Returns query progress
bool GetProgress(double &current_percentage);
bool GetProgress(double &current_percentage, idx_t &estimated_cardinality);

//! Returns a list of all operators (including source and sink) involved in this pipeline
vector<PhysicalOperator *> GetOperators() const;
Expand Down Expand Up @@ -115,7 +115,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
idx_t base_batch_index = 0;

private:
bool GetProgressInternal(ClientContext &context, PhysicalOperator *op, double &current_percentage);
void ScheduleSequentialTask(shared_ptr<Event> &event);
bool LaunchScanTasks(shared_ptr<Event> &event, idx_t max_threads);

Expand Down
2 changes: 1 addition & 1 deletion src/main/client_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ unique_ptr<PendingQueryResult> ClientContext::PendingPreparedStatement(ClientCon
active_query->executor = make_unique<Executor>(*this);
auto &executor = *active_query->executor;
if (config.enable_progress_bar) {
active_query->progress_bar = make_unique<ProgressBar>(executor, config.wait_time);
active_query->progress_bar = make_unique<ProgressBar>(executor, config.wait_time, config.print_progress_bar);
active_query->progress_bar->Start();
query_progress = 0;
}
Expand Down
22 changes: 17 additions & 5 deletions src/parallel/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,24 @@ void Executor::Flush(ThreadContext &tcontext) {
bool Executor::GetPipelinesProgress(double &current_progress) { // LCOV_EXCL_START
lock_guard<mutex> elock(executor_lock);

if (!pipelines.empty()) {
return pipelines.back()->GetProgress(current_progress);
} else {
current_progress = -1;
return true;
vector<double> progress;
vector<idx_t> cardinality;
idx_t total_cardinality = 0;
for (auto &pipeline : pipelines) {
double child_percentage;
idx_t child_cardinality;
if (!pipeline->GetProgress(child_percentage, child_cardinality)) {
return false;
}
progress.push_back(child_percentage);
cardinality.push_back(child_cardinality);
total_cardinality += child_cardinality;
}
current_progress = 0;
for (size_t i = 0; i < progress.size(); i++) {
current_progress += progress[i] * double(cardinality[i]) / double(total_cardinality);
}
return true;
} // LCOV_EXCL_STOP

bool Executor::HasResultCollector() {
Expand Down
47 changes: 5 additions & 42 deletions src/parallel/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,50 +55,13 @@ ClientContext &Pipeline::GetClientContext() {
return executor.context;
}

// LCOV_EXCL_START
bool Pipeline::GetProgressInternal(ClientContext &context, PhysicalOperator *op, double &current_percentage) {
current_percentage = -1;
switch (op->type) {
case PhysicalOperatorType::TABLE_SCAN: {
auto &get = (PhysicalTableScan &)*op;
if (get.function.table_scan_progress) {
current_percentage = get.function.table_scan_progress(context, get.bind_data.get());
return true;
}
// If the table_scan_progress is not implemented it means we don't support this function yet in the progress
// bar
return false;
}
// If it is not a table scan we go down on all children until we reach the leaf operators
default: {
vector<idx_t> progress;
vector<idx_t> cardinality;
double total_cardinality = 0;
current_percentage = 0;
for (auto &op_child : op->children) {
double child_percentage = 0;
if (!GetProgressInternal(context, op_child.get(), child_percentage)) {
return false;
}
if (!Value::DoubleIsFinite(child_percentage)) {
return false;
}
progress.push_back(child_percentage);
cardinality.push_back(op_child->estimated_cardinality);
total_cardinality += op_child->estimated_cardinality;
}
for (size_t i = 0; i < progress.size(); i++) {
current_percentage += progress[i] * cardinality[i] / total_cardinality;
}
return true;
}
}
}
// LCOV_EXCL_STOP
bool Pipeline::GetProgress(double &current_percentage, idx_t &source_cardinality) {
D_ASSERT(source);

bool Pipeline::GetProgress(double &current_percentage) {
auto &client = executor.context;
return GetProgressInternal(client, source, current_percentage);
current_percentage = source->GetProgress(client, *source_state);
source_cardinality = source->estimated_cardinality;
return current_percentage >= 0;
}

void Pipeline::ScheduleSequentialTask(shared_ptr<Event> &event) {
Expand Down

0 comments on commit ac159c1

Please sign in to comment.