diff --git a/src/common/progress_bar.cpp b/src/common/progress_bar.cpp index dbadc162b1b..3efe2ee08f4 100644 --- a/src/common/progress_bar.cpp +++ b/src/common/progress_bar.cpp @@ -4,8 +4,9 @@ namespace duckdb { -ProgressBar::ProgressBar(Executor &executor, idx_t show_progress_after) - : executor(executor), show_progress_after(show_progress_after), current_percentage(-1) { +ProgressBar::ProgressBar(Executor &executor, idx_t show_progress_after, bool print_progress) + : executor(executor), show_progress_after(show_progress_after), current_percentage(-1), + print_progress(print_progress) { } double ProgressBar::GetCurrentPercentage() { @@ -28,7 +29,6 @@ void ProgressBar::Update(bool final) { return; } auto sufficient_time_elapsed = profiler.Elapsed() > show_progress_after / 1000.0; - auto print_progress = ClientConfig::GetConfig(executor.context).print_progress_bar; if (new_percentage > current_percentage) { current_percentage = new_percentage; } diff --git a/src/execution/operator/scan/physical_table_scan.cpp b/src/execution/operator/scan/physical_table_scan.cpp index e97ffb7d225..b6b9a806488 100644 --- a/src/execution/operator/scan/physical_table_scan.cpp +++ b/src/execution/operator/scan/physical_table_scan.cpp @@ -113,6 +113,14 @@ void PhysicalTableScan::GetData(ExecutionContext &context, DataChunk &chunk, Glo } } +double PhysicalTableScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const { + if (function.table_scan_progress) { + return function.table_scan_progress(context, bind_data.get()); + } + // if table_scan_progress is not implemented we don't support this function yet in the progress bar + return -1; +} + idx_t PhysicalTableScan::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate_p, LocalSourceState &lstate) const { D_ASSERT(SupportsBatchIndex()); diff --git a/src/execution/physical_operator.cpp b/src/execution/physical_operator.cpp index 47860f5046e..c855c24384a 100644 --- a/src/execution/physical_operator.cpp +++ b/src/execution/physical_operator.cpp @@ -74,6 +74,10 @@ idx_t PhysicalOperator::GetBatchIndex(ExecutionContext &context, DataChunk &chun LocalSourceState &lstate) const { throw InternalException("Calling GetBatchIndex on a node that does not support it"); } + +double PhysicalOperator::GetProgress(ClientContext &context, GlobalSourceState &gstate) const { + return -1; +} // LCOV_EXCL_STOP //===--------------------------------------------------------------------===// diff --git a/src/include/duckdb/common/progress_bar.hpp b/src/include/duckdb/common/progress_bar.hpp index 5e41fcb43da..685109113f4 100644 --- a/src/include/duckdb/common/progress_bar.hpp +++ b/src/include/duckdb/common/progress_bar.hpp @@ -17,7 +17,7 @@ namespace duckdb { class ProgressBar { public: - explicit ProgressBar(Executor &executor, idx_t show_progress_after); + explicit ProgressBar(Executor &executor, idx_t show_progress_after, bool print_progress); //! Starts the thread void Start(); @@ -39,6 +39,8 @@ class ProgressBar { idx_t show_progress_after; //! The current progress percentage double current_percentage; + //! Whether or not we print the progress bar + bool print_progress; //! Whether or not profiling is supported for the current query bool supported = true; }; diff --git a/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp b/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp index e79b6408661..f756d03eea3 100644 --- a/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp +++ b/src/include/duckdb/execution/operator/scan/physical_table_scan.hpp @@ -55,6 +55,8 @@ class PhysicalTableScan : public PhysicalOperator { bool SupportsBatchIndex() const override { return function.supports_batch_index; } + + double GetProgress(ClientContext &context, GlobalSourceState &gstate) const override; }; } // namespace duckdb diff --git a/src/include/duckdb/execution/physical_operator.hpp b/src/include/duckdb/execution/physical_operator.hpp index 4542659ad8e..12eed069877 100644 --- a/src/include/duckdb/execution/physical_operator.hpp +++ b/src/include/duckdb/execution/physical_operator.hpp @@ -165,6 +165,9 @@ class PhysicalOperator { return false; } + //! Returns the current progress percentage, or a negative value if progress bars are not supported + virtual double GetProgress(ClientContext &context, GlobalSourceState &gstate) const; + public: // Sink interface diff --git a/src/include/duckdb/parallel/pipeline.hpp b/src/include/duckdb/parallel/pipeline.hpp index b92eab4431f..ecad5b2cb8a 100644 --- a/src/include/duckdb/parallel/pipeline.hpp +++ b/src/include/duckdb/parallel/pipeline.hpp @@ -81,7 +81,7 @@ class Pipeline : public std::enable_shared_from_this { void Print() const; //! Returns query progress - bool GetProgress(double ¤t_percentage); + bool GetProgress(double ¤t_percentage, idx_t &estimated_cardinality); //! Returns a list of all operators (including source and sink) involved in this pipeline vector GetOperators() const; @@ -115,7 +115,6 @@ class Pipeline : public std::enable_shared_from_this { idx_t base_batch_index = 0; private: - bool GetProgressInternal(ClientContext &context, PhysicalOperator *op, double ¤t_percentage); void ScheduleSequentialTask(shared_ptr &event); bool LaunchScanTasks(shared_ptr &event, idx_t max_threads); diff --git a/src/main/client_context.cpp b/src/main/client_context.cpp index fc38cbeb2b1..3e3d2cb7e05 100644 --- a/src/main/client_context.cpp +++ b/src/main/client_context.cpp @@ -326,7 +326,7 @@ unique_ptr ClientContext::PendingPreparedStatement(ClientCon active_query->executor = make_unique(*this); auto &executor = *active_query->executor; if (config.enable_progress_bar) { - active_query->progress_bar = make_unique(executor, config.wait_time); + active_query->progress_bar = make_unique(executor, config.wait_time, config.print_progress_bar); active_query->progress_bar->Start(); query_progress = 0; } diff --git a/src/parallel/executor.cpp b/src/parallel/executor.cpp index 4cd626a7820..3d14b0898f7 100644 --- a/src/parallel/executor.cpp +++ b/src/parallel/executor.cpp @@ -492,12 +492,24 @@ void Executor::Flush(ThreadContext &tcontext) { bool Executor::GetPipelinesProgress(double ¤t_progress) { // LCOV_EXCL_START lock_guard elock(executor_lock); - if (!pipelines.empty()) { - return pipelines.back()->GetProgress(current_progress); - } else { - current_progress = -1; - return true; + vector progress; + vector cardinality; + idx_t total_cardinality = 0; + for (auto &pipeline : pipelines) { + double child_percentage; + idx_t child_cardinality; + if (!pipeline->GetProgress(child_percentage, child_cardinality)) { + return false; + } + progress.push_back(child_percentage); + cardinality.push_back(child_cardinality); + total_cardinality += child_cardinality; } + current_progress = 0; + for (size_t i = 0; i < progress.size(); i++) { + current_progress += progress[i] * double(cardinality[i]) / double(total_cardinality); + } + return true; } // LCOV_EXCL_STOP bool Executor::HasResultCollector() { diff --git a/src/parallel/pipeline.cpp b/src/parallel/pipeline.cpp index 0945f13c9cb..517c6f2cc1c 100644 --- a/src/parallel/pipeline.cpp +++ b/src/parallel/pipeline.cpp @@ -55,50 +55,13 @@ ClientContext &Pipeline::GetClientContext() { return executor.context; } -// LCOV_EXCL_START -bool Pipeline::GetProgressInternal(ClientContext &context, PhysicalOperator *op, double ¤t_percentage) { - current_percentage = -1; - switch (op->type) { - case PhysicalOperatorType::TABLE_SCAN: { - auto &get = (PhysicalTableScan &)*op; - if (get.function.table_scan_progress) { - current_percentage = get.function.table_scan_progress(context, get.bind_data.get()); - return true; - } - // If the table_scan_progress is not implemented it means we don't support this function yet in the progress - // bar - return false; - } - // If it is not a table scan we go down on all children until we reach the leaf operators - default: { - vector progress; - vector cardinality; - double total_cardinality = 0; - current_percentage = 0; - for (auto &op_child : op->children) { - double child_percentage = 0; - if (!GetProgressInternal(context, op_child.get(), child_percentage)) { - return false; - } - if (!Value::DoubleIsFinite(child_percentage)) { - return false; - } - progress.push_back(child_percentage); - cardinality.push_back(op_child->estimated_cardinality); - total_cardinality += op_child->estimated_cardinality; - } - for (size_t i = 0; i < progress.size(); i++) { - current_percentage += progress[i] * cardinality[i] / total_cardinality; - } - return true; - } - } -} -// LCOV_EXCL_STOP +bool Pipeline::GetProgress(double ¤t_percentage, idx_t &source_cardinality) { + D_ASSERT(source); -bool Pipeline::GetProgress(double ¤t_percentage) { auto &client = executor.context; - return GetProgressInternal(client, source, current_percentage); + current_percentage = source->GetProgress(client, *source_state); + source_cardinality = source->estimated_cardinality; + return current_percentage >= 0; } void Pipeline::ScheduleSequentialTask(shared_ptr &event) {