From 008d91ba69af158500385e2358fa9c24955aba75 Mon Sep 17 00:00:00 2001 From: Wen Zhenghu Date: Sun, 10 May 2026 10:35:35 +0800 Subject: [PATCH] [improvement](executor) unify current query runtime statistics and expose task progress (#60567) **PR Summary** - This PR unifies current-query runtime statistics onto the `BE -> FE` reporting pipeline, replacing the previous ad-hoc `RuntimeProfile` traversal path, and enriches `current_queries` with task-level progress plus broader resource metrics. - The goal is to make current-query visibility more real-time and consistent with audit statistics while simplifying and consolidating FE proc/REST surfaces. **What It Solves** - Unifies statistics source: `QeProcessorImpl` now reads aggregated `TQueryStatistics` from `WorkloadRuntimeStatusMgr` instead of relying on the legacy `CurrentQueryInfoProvider` path. - Improves progress observability: introduces `process_rows`, `total_tasks_num`, and `finished_tasks_num`, and exposes computed `Progress`. - Expands runtime metrics coverage: `current_queries` now includes richer scan/cpu/memory/shuffle/spill/cache counters. - Consolidates query views: `/current_queries` and `/current_query_stmts` now share the same statistics view; legacy per-query/per-fragment proc drill-down implementation is removed. **Implementation Details** - Protocol layer: - Extends `TQueryStatistics` with `process_rows`, `finished_tasks_num`, and `total_tasks_num`. - BE collection/reporting: - Accumulates `process_rows` in the execution path. - Records `total_tasks_num` at pipeline task graph initialization and increments `finished_tasks_num` in real time when tasks close. - Mirrors task-progress counters into `QueryTaskController` so counters remain available even after `QueryContext` teardown. - Exports new fields in `ResourceContext::to_thrift_query_statistics`. - FE aggregation/retention: - `WorkloadRuntimeStatusMgr` merges additional fields (including task progress) and refines timeout cleanup: remove query stats only when they are timed out and the query no longer exists in FE. - `QueryStatisticsItem` now carries `TQueryStatistics` as the unified data carrier for proc/REST. - Presentation layer: - `CurrentQueryStatisticsProcDir` adds expanded columns and computes `Progress`. - `/rest/v2/manager/query/current_queries` in `QueryProfileAction` now serves the same unified stats view. - Removes legacy classes: `CurrentQueryInfoProvider`, `CurrentQuerySqlProcDir`, `CurrentQueryFragmentProcNode`, and `CurrentQueryStatementsProcNode`. ``` *************************** 1. row *************************** QueryId: e00b00b1155d4042-98862b60016a768a ConnectionId: 394 Catalog: internal Database: wzhtest User: root ExecTime: 20717 SqlHash: cf263b08302d8be436c97dd5e6f0d283 Statement: INSERT INTO test_query_progress_tb SELECT DISTINCT k, CONCAT(v, CAST(k AS STRING)) FROM test_query_progress_tb WHERE k % 2 = 0 ScanRows: 45400000 Rows ScanBytes: 2.70 GB ProcessRows: 75598123 Rows CpuMs: 178336 MaxPeakMemoryBytes: 13.03 GB CurrentUsedMemoryBytes: 8.69 GB WorkloadGroupId: 1777125330381 ShuffleSendBytes: 0.00 ShuffleSendRows: 0 Rows ScanBytesFromLocalStorage: 31.48 MB ScanBytesFromRemoteStorage: 0.00 SpillWriteBytesToLocalStorage: 0.00 SpillReadBytesFromLocalStorage: 0.00 BytesWriteIntoCache: 0.00 TotalTasks: 74 FinishedTasks: 51 Progress: 68% ------------------------ -- first-- QueryId: e2b8c99658a94743-9ebbf0d036d83295 ConnectionId: 9 Catalog: hive_test Database: tpch100_parquet User: root ExecTime: 6093 SqlHash: f8a30e4182d72cce3eff6cb385005b1f Statement: select ... from supplier, lineitem l1, orders, nation ... limit 100 ScanRows: 621466194 Rows ScanBytes: 5.37 GB ProcessRows: 79079742 Rows CpuMs: 31655 MaxPeakMemoryBytes: 2.32 GB CurrentUsedMemoryBytes: 2.18 GB WorkloadGroupId: 1777253545394 ShuffleSendBytes: 0.00 ShuffleSendRows: 0 Rows ScanBytesFromLocalStorage: 0.00 ScanBytesFromRemoteStorage: 5.37 GB SpillWriteBytesToLocalStorage: 0.00 SpillReadBytesFromLocalStorage: 0.00 BytesWriteIntoCache: 0.00 TotalTasks: 138 FinishedTasks: 49 Progress: 35% --second-- QueryId: e2b8c99658a94743-9ebbf0d036d83295 ConnectionId: 9 Catalog: hive_test Database: tpch100_parquet User: root ExecTime: 10807 SqlHash: f8a30e4182d72cce3eff6cb385005b1f Statement: select ... from supplier, lineitem l1, orders, nation ... limit 100 ScanRows: 1102562592 Rows ScanBytes: 9.20 GB ProcessRows: 112176670 Rows CpuMs: 53808 MaxPeakMemoryBytes: 3.13 GB CurrentUsedMemoryBytes: 2.50 GB WorkloadGroupId: 1777253545394 ShuffleSendBytes: 0.00 ShuffleSendRows: 0 Rows ScanBytesFromLocalStorage: 0.00 ScanBytesFromRemoteStorage: 9.20 GB SpillWriteBytesToLocalStorage: 0.00 SpillReadBytesFromLocalStorage: 0.00 BytesWriteIntoCache: 0.00 TotalTasks: 138 FinishedTasks: 65 Progress: 47% ``` None - Test - [x] Regression test - [x] Unit Test - [x] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [x] Yes. - Does this need documentation? - [ ] No. - [x] Yes. - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --------- Co-authored-by: yiguolei Co-authored-by: xuchenhao <419062425@qq.com> Co-authored-by: xuchenhao <48084123+xuchenhao@users.noreply.github.com> --- be/src/exec/operator/operator.cpp | 1 + .../pipeline/pipeline_fragment_context.cpp | 4 + .../exec/pipeline/pipeline_fragment_context.h | 5 +- be/src/runtime/query_context.cpp | 12 + be/src/runtime/query_context.h | 6 + .../runtime/workload_management/io_context.h | 4 + .../query_task_controller.cpp | 18 ++ .../query_task_controller.h | 10 + .../workload_management/resource_context.cpp | 8 + be/test/exec/pipeline/pipeline_test.cpp | 125 ++++++++ .../proc/CurrentQueryFragmentProcNode.java | 89 ------ .../common/proc/CurrentQueryInfoProvider.java | 200 ------------ .../common/proc/CurrentQuerySqlProcDir.java | 70 ----- .../proc/CurrentQueryStatementsProcNode.java | 71 ----- .../proc/CurrentQueryStatisticsProcDir.java | 80 +++-- .../apache/doris/common/proc/ProcService.java | 2 +- .../doris/common/profile/RuntimeProfile.java | 5 - .../rest/manager/QueryProfileAction.java | 10 +- .../org/apache/doris/qe/QeProcessorImpl.java | 5 + .../apache/doris/qe/QueryStatisticsItem.java | 18 ++ .../WorkloadRuntimeStatusMgr.java | 112 +++++-- .../CurrentQueryStatisticsProcDirTest.java | 89 ++++++ .../WorkloadRuntimeStatusMgrTest.java | 285 ++++++++++++++++++ gensrc/thrift/FrontendService.thrift | 3 + 24 files changed, 739 insertions(+), 493 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java diff --git a/be/src/exec/operator/operator.cpp b/be/src/exec/operator/operator.cpp index 27d8acf859aa80..c2be7392075221 100644 --- a/be/src/exec/operator/operator.cpp +++ b/be/src/exec/operator/operator.cpp @@ -423,6 +423,7 @@ void PipelineXLocalStateBase::reached_limit(Block* block, bool* eos) { if (auto rows = block->rows()) { _num_rows_returned += rows; + _state->get_query_ctx()->resource_ctx()->io_context()->update_process_rows(rows); } } diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index e04745a853eb46..c8f83ad07812e1 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -602,6 +602,8 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { } _pipeline_parent_map.clear(); _op_id_to_shared_state.clear(); + // Record task cardinality once when this fragment context finishes task initialization. + _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed)); return Status::OK(); } @@ -1934,6 +1936,8 @@ void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) { { std::lock_guard l(_task_mutex); ++_closed_tasks; + // Update query-level finished task progress in real time. + _query_ctx->inc_finished_task_num(); if (_closed_tasks >= _total_tasks) { need_remove = _close_fragment_instance(); } diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h b/be/src/exec/pipeline/pipeline_fragment_context.h index 01306799aa451b..c220ea386f68db 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.h +++ b/be/src/exec/pipeline/pipeline_fragment_context.h @@ -39,6 +39,7 @@ #include "runtime/runtime_state.h" #include "runtime/task_execution_context.h" #include "util/stopwatch.hpp" +#include "util/uid_util.h" namespace doris { struct ReportStatusRequest; @@ -88,11 +89,11 @@ class PipelineFragmentContext : public TaskExecutionContext { [[nodiscard]] int get_fragment_id() const { return _fragment_id; } + void decrement_running_task(PipelineId pipeline_id); + uint32_t rec_cte_stage() const { return _rec_cte_stage; } void set_rec_cte_stage(uint32_t stage) { _rec_cte_stage = stage; } - void decrement_running_task(PipelineId pipeline_id); - Status send_report(bool); void trigger_report_if_necessary(); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index e00a37aadb1ec3..6fb8e41d824808 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -555,4 +555,16 @@ Status QueryContext::reset_global_rf(const google::protobuf::RepeatedField(_resource_ctx->task_controller())) { + qtc->add_total_task_num(delta); + } +} + +void QueryContext::inc_finished_task_num() { + if (auto* qtc = dynamic_cast(_resource_ctx->task_controller())) { + qtc->inc_finished_task_num(); + } +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index d5e559890c0830..5e2b6babe87305 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -47,6 +47,7 @@ namespace doris { class PipelineFragmentContext; class PipelineTask; +class QueryTaskController; class Dependency; class RecCTEScanLocalState; @@ -203,6 +204,10 @@ class QueryContext : public std::enable_shared_from_this { TUniqueId query_id() const { return _query_id; } + // Expose task-level query progress counters for runtime statistics reporting. + void add_total_task_num(int delta); + void inc_finished_task_num(); + ScannerScheduler* get_scan_scheduler() { return _scan_task_scheduler; } ScannerScheduler* get_remote_scan_scheduler() { return _remote_scan_task_scheduler; } @@ -311,6 +316,7 @@ class QueryContext : public std::enable_shared_from_this { Status reset_global_rf(const google::protobuf::RepeatedField& filter_ids); private: + // Task-level progress counters for current query. friend class QueryTaskController; int _timeout_second; diff --git a/be/src/runtime/workload_management/io_context.h b/be/src/runtime/workload_management/io_context.h index 3c0dd4343fddb2..1032c148a63041 100644 --- a/be/src/runtime/workload_management/io_context.h +++ b/be/src/runtime/workload_management/io_context.h @@ -45,6 +45,7 @@ class IOContext : public std::enable_shared_from_this { // number rows returned by query. // only set once by result sink when closing. RuntimeProfile::Counter* returned_rows_counter_; + RuntimeProfile::Counter* process_rows_counter_; RuntimeProfile::Counter* shuffle_send_bytes_counter_; RuntimeProfile::Counter* shuffle_send_rows_counter_; @@ -63,6 +64,7 @@ class IOContext : public std::enable_shared_from_this { bytes_write_into_cache_counter_ = ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES); returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT); + process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows", TUnit::UNIT); shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES); shuffle_send_rows_counter_ = ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT); @@ -94,6 +96,7 @@ class IOContext : public std::enable_shared_from_this { return stats_.bytes_write_into_cache_counter_->value(); } int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); } + int64_t process_rows() const { return stats_.process_rows_counter_->value(); } int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); } int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); } @@ -117,6 +120,7 @@ class IOContext : public std::enable_shared_from_this { stats_.bytes_write_into_cache_counter_->update(delta); } void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); } + void update_process_rows(int64_t delta) const { stats_.process_rows_counter_->update(delta); } void update_shuffle_send_bytes(int64_t delta) const { stats_.shuffle_send_bytes_counter_->update(delta); } diff --git a/be/src/runtime/workload_management/query_task_controller.cpp b/be/src/runtime/workload_management/query_task_controller.cpp index 47d6c7c05cd572..16e950fa56c7c9 100644 --- a/be/src/runtime/workload_management/query_task_controller.cpp +++ b/be/src/runtime/workload_management/query_task_controller.cpp @@ -227,5 +227,23 @@ std::vector QueryTaskController::get_revocable_tasks() { return tasks; } +void QueryTaskController::add_total_task_num(int delta) { + _total_task_num.fetch_add(delta, std::memory_order_relaxed); +} + +void QueryTaskController::inc_finished_task_num() { + _finished_task_num.fetch_add(1, std::memory_order_relaxed); +} + +int QueryTaskController::get_total_task_num() const { + // Read from controller-owned counters to avoid lifecycle dependency on QueryContext. + return _total_task_num.load(std::memory_order_relaxed); +} + +int QueryTaskController::get_finished_task_num() const { + // Read from controller-owned counters to avoid lifecycle dependency on QueryContext. + return _finished_task_num.load(std::memory_order_relaxed); +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/runtime/workload_management/query_task_controller.h b/be/src/runtime/workload_management/query_task_controller.h index f10a846c538d8e..0d46196c1509e6 100644 --- a/be/src/runtime/workload_management/query_task_controller.h +++ b/be/src/runtime/workload_management/query_task_controller.h @@ -17,6 +17,8 @@ #pragma once +#include + #include "common/factory_creator.h" #include "runtime/workload_management/task_controller.h" @@ -46,11 +48,19 @@ class QueryTaskController : public TaskController { size_t get_revocable_size() override; Status revoke_memory() override; std::vector get_revocable_tasks() override; + // Expose task progress counters without leaking full QueryContext. + void add_total_task_num(int delta); + void inc_finished_task_num(); + int get_total_task_num() const; + int get_finished_task_num() const; protected: QueryTaskController(const std::shared_ptr& query_ctx) : query_ctx_(query_ctx) {} const std::weak_ptr query_ctx_; + // Keep task progress counters in controller so they outlive QueryContext if needed. + std::atomic _total_task_num {0}; + std::atomic _finished_task_num {0}; }; #include "common/compile_check_end.h" diff --git a/be/src/runtime/workload_management/resource_context.cpp b/be/src/runtime/workload_management/resource_context.cpp index c1b0fd2b744338..d7a729aa69c1d7 100644 --- a/be/src/runtime/workload_management/resource_context.cpp +++ b/be/src/runtime/workload_management/resource_context.cpp @@ -20,6 +20,7 @@ #include #include +#include "runtime/workload_management/query_task_controller.h" #include "util/time.h" namespace doris { @@ -31,6 +32,7 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c statistics->__set_scan_bytes(io_context()->scan_bytes()); statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS); statistics->__set_returned_rows(io_context()->returned_rows()); + statistics->__set_process_rows(io_context()->process_rows()); statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes()); statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes()); statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes()); @@ -50,6 +52,12 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c io_context_->spill_write_bytes_to_local_storage()); statistics->__set_spill_read_bytes_from_local_storage( io_context_->spill_read_bytes_from_local_storage()); + + if (auto* query_task_controller = dynamic_cast(task_controller())) { + // Fill query task-level progress directly from task controller. + statistics->__set_total_tasks_num(query_task_controller->get_total_task_num()); + statistics->__set_finished_tasks_num(query_task_controller->get_finished_task_num()); + } } #include "common/compile_check_end.h" diff --git a/be/test/exec/pipeline/pipeline_test.cpp b/be/test/exec/pipeline/pipeline_test.cpp index 4150e281a3539c..d5d0b5028d6300 100644 --- a/be/test/exec/pipeline/pipeline_test.cpp +++ b/be/test/exec/pipeline/pipeline_test.cpp @@ -17,6 +17,7 @@ #include "exec/pipeline/pipeline.h" +#include #include #include @@ -40,6 +41,8 @@ #include "runtime/descriptor_helper.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" +#include "runtime/workload_management/query_task_controller.h" +#include "runtime/workload_management/resource_context.h" namespace doris { @@ -467,6 +470,36 @@ TEST_F(PipelineTest, HAPPY_PATH) { downstream_recvr->close(); } +TEST_F(PipelineTest, QueryTaskProgressCounters) { + // Verify task-level counters are updated via QueryContext and exposed by QueryTaskController. + _query_ctx->add_total_task_num(7); + _query_ctx->inc_finished_task_num(); + _query_ctx->inc_finished_task_num(); + _query_ctx->inc_finished_task_num(); + + auto* query_task_controller = + dynamic_cast(_query_ctx->resource_ctx()->task_controller()); + ASSERT_NE(query_task_controller, nullptr); + EXPECT_EQ(query_task_controller->get_total_task_num(), 7); + EXPECT_EQ(query_task_controller->get_finished_task_num(), 3); +} + +TEST_F(PipelineTest, QueryTaskProgressCountersOutliveQueryContext) { + // Verify controller-owned counters still work after QueryContext is destroyed. + auto resource_ctx = _query_ctx->resource_ctx(); + auto* query_task_controller = + dynamic_cast(resource_ctx->task_controller()); + ASSERT_NE(query_task_controller, nullptr); + + _query_ctx->add_total_task_num(5); + _query_ctx->inc_finished_task_num(); + _query_ctx->inc_finished_task_num(); + + _query_ctx.reset(); + EXPECT_EQ(query_task_controller->get_total_task_num(), 5); + EXPECT_EQ(query_task_controller->get_finished_task_num(), 2); +} + TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) { _reset(); // Pipeline(ExchangeOperator(id=0, HASH_PARTITIONED) -> ExchangeSinkOperatorX(id=1, UNPARTITIONED)) @@ -1163,4 +1196,96 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) { downstream_recvr->close(); } +TEST_F(PipelineTest, QueryTaskProgressConcurrentUpdates) { + // Verify counters are thread-safe under concurrent updates from multiple threads. + auto resource_ctx = _query_ctx->resource_ctx(); + auto* ctrl = dynamic_cast(resource_ctx->task_controller()); + ASSERT_NE(ctrl, nullptr); + + _query_ctx->add_total_task_num(400); + + std::vector threads; + for (int i = 0; i < 8; i++) { + threads.emplace_back([this]() { + for (int j = 0; j < 50; j++) { + _query_ctx->inc_finished_task_num(); + } + }); + } + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(ctrl->get_total_task_num(), 400); + EXPECT_EQ(ctrl->get_finished_task_num(), 400); +} + +TEST_F(PipelineTest, QueryTaskProgressThriftSerialization) { + // Verify progress counters are correctly serialized to Thrift struct. + _query_ctx->add_total_task_num(10); + _query_ctx->inc_finished_task_num(); + _query_ctx->inc_finished_task_num(); + _query_ctx->inc_finished_task_num(); + _query_ctx->inc_finished_task_num(); + + TQueryStatistics tqs; + _query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs); + + EXPECT_TRUE(tqs.__isset.total_tasks_num); + EXPECT_EQ(tqs.total_tasks_num, 10); + EXPECT_TRUE(tqs.__isset.finished_tasks_num); + EXPECT_EQ(tqs.finished_tasks_num, 4); +} + +TEST_F(PipelineTest, QueryTaskProgressBoundaryZeroTotal) { + // Verify behavior when no tasks have been registered (total = 0). + auto* ctrl = dynamic_cast(_query_ctx->resource_ctx()->task_controller()); + ASSERT_NE(ctrl, nullptr); + + // total = 0, finished = 0 + EXPECT_EQ(ctrl->get_total_task_num(), 0); + EXPECT_EQ(ctrl->get_finished_task_num(), 0); + + // inc_finished with no total should still work without crash + _query_ctx->inc_finished_task_num(); + EXPECT_EQ(ctrl->get_finished_task_num(), 1); +} + +TEST_F(PipelineTest, QueryTaskProgressAllFinished) { + // Verify 100% progress when all tasks finish. + _query_ctx->add_total_task_num(8); + for (int i = 0; i < 8; i++) { + _query_ctx->inc_finished_task_num(); + } + + auto* ctrl = dynamic_cast(_query_ctx->resource_ctx()->task_controller()); + ASSERT_NE(ctrl, nullptr); + EXPECT_EQ(ctrl->get_total_task_num(), 8); + EXPECT_EQ(ctrl->get_finished_task_num(), 8); + + // Verify thrift serialization + TQueryStatistics tqs; + _query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs); + EXPECT_EQ(tqs.total_tasks_num, 8); + EXPECT_EQ(tqs.finished_tasks_num, 8); +} + +TEST_F(PipelineTest, QueryTaskProgressCountersSurviveReset) { + // Verify that after calling _reset(), fresh counters are initialized to zero. + auto* ctrl1 = dynamic_cast(_query_ctx->resource_ctx()->task_controller()); + ASSERT_NE(ctrl1, nullptr); + _query_ctx->add_total_task_num(10); + _query_ctx->inc_finished_task_num(); + _query_ctx->inc_finished_task_num(); + EXPECT_EQ(ctrl1->get_total_task_num(), 10); + EXPECT_EQ(ctrl1->get_finished_task_num(), 2); + + // Reset creates a new QueryContext + _reset(); + auto* ctrl2 = dynamic_cast(_query_ctx->resource_ctx()->task_controller()); + ASSERT_NE(ctrl2, nullptr); + EXPECT_EQ(ctrl2->get_total_task_num(), 0); + EXPECT_EQ(ctrl2->get_finished_task_num(), 0); +} + } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java deleted file mode 100644 index 27b7e673d389d5..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.util.QueryStatisticsFormatter; -import org.apache.doris.qe.QueryStatisticsItem; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collection; -import java.util.Comparator; -import java.util.List; - -/* - * show proc "/current_queries/{query_id}/fragments" - * set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows". - */ -public class CurrentQueryFragmentProcNode implements ProcNodeInterface { - private static final Logger LOG = LogManager.getLogger(CurrentQueryFragmentProcNode.class); - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("FragmentId").add("InstanceId").add("Host") - .add("ScanBytes").add("ProcessRows").build(); - private QueryStatisticsItem item; - - public CurrentQueryFragmentProcNode(QueryStatisticsItem item) { - this.item = item; - } - - @Override - public ProcResult fetchResult() throws AnalysisException { - return requestFragmentExecInfos(); - } - - private ProcResult requestFragmentExecInfos() throws AnalysisException { - final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); - final Collection instanceStatisticsCollection - = provider.getInstanceStatistics(item); - final List> sortedRowData = Lists.newArrayList(); - for (CurrentQueryInfoProvider.InstanceStatistics instanceStatistics : - instanceStatisticsCollection) { - final List rowData = Lists.newArrayList(); - rowData.add(instanceStatistics.getFragmentId()); - rowData.add(instanceStatistics.getInstanceId().toString()); - rowData.add(instanceStatistics.getAddress().toString()); - if (item.getIsReportSucc()) { - rowData.add(QueryStatisticsFormatter.getScanBytes( - instanceStatistics.getScanBytes())); - rowData.add(QueryStatisticsFormatter.getRowsReturned( - instanceStatistics.getRowsReturned())); - } else { - rowData.add("N/A"); - rowData.add("N/A"); - } - sortedRowData.add(rowData); - } - - // sort according to explain's fragment index - sortedRowData.sort(new Comparator>() { - @Override - public int compare(List l1, List l2) { - return l1.get(0).compareTo(l2.get(0)); - } - }); - final BaseProcResult result = new BaseProcResult(); - result.setNames(TITLE_NAMES.asList()); - result.setRows(sortedRowData); - return result; - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java deleted file mode 100644 index de7247ab3ab8da..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ /dev/null @@ -1,200 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.profile.Counter; -import org.apache.doris.common.profile.RuntimeProfile; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.qe.QueryStatisticsItem; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TUniqueId; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -/** - * Provide running query's statistics. - */ -public class CurrentQueryInfoProvider { - private static final Logger LOG = LogManager.getLogger(CurrentQueryInfoProvider.class); - - public CurrentQueryInfoProvider() { - } - - /** - * get Counters from Coordinator's RuntimeProfile and return query's statistics. - * - * @param item - * @return - * @throws AnalysisException - */ - public QueryStatistics getQueryStatistics(QueryStatisticsItem item) throws AnalysisException { - return new QueryStatistics(item.getQueryProfile()); - } - - /** - * - * @param items - * @return - * @throws AnalysisException - */ - public Map getQueryStatistics(Collection items) { - final Map queryStatisticsMap = Maps.newHashMap(); - for (QueryStatisticsItem item : items) { - queryStatisticsMap.put(item.getQueryId(), new QueryStatistics(item.getQueryProfile())); - } - return queryStatisticsMap; - } - - /** - * Return query's instances statistics. - * - * @param item - * @return - * @throws AnalysisException - */ - public Collection getInstanceStatistics(QueryStatisticsItem item) throws AnalysisException { - final Map instanceProfiles = collectInstanceProfile(item.getQueryProfile()); - final List instanceStatisticsList = Lists.newArrayList(); - for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) { - final RuntimeProfile instanceProfile - = instanceProfiles.get(DebugUtil.printId(instanceInfo.getInstanceId())); - Preconditions.checkNotNull(instanceProfile); - final InstanceStatistics Statistics = - new InstanceStatistics( - instanceInfo.getFragmentId(), - instanceInfo.getInstanceId(), - instanceInfo.getAddress(), - instanceProfile); - instanceStatisticsList.add(Statistics); - } - return instanceStatisticsList; - } - - /** - * Profile trees is query profile -> fragment profile -> instance profile .... - * @param queryProfile - * @return instanceProfiles - */ - private Map collectInstanceProfile(RuntimeProfile queryProfile) { - final Map instanceProfiles = Maps.newHashMap(); - for (RuntimeProfile fragmentProfile : queryProfile.getChildMap().values()) { - for (Map.Entry entry : fragmentProfile.getChildMap().entrySet()) { - Preconditions.checkState(instanceProfiles.put( - parseInstanceId(entry.getKey()), entry.getValue()) == null); - } - } - return instanceProfiles; - } - - /** - * Instance profile key is "Instance ${instance_id} (host=$host $port)" - * @param str - * @return - */ - private String parseInstanceId(String str) { - final String[] elements = str.split(" "); - if (elements.length == 4) { - return elements[1]; - } else { - Preconditions.checkState(false); - return ""; - } - } - - public static class QueryStatistics { - final List> counterMaps; - - public QueryStatistics(RuntimeProfile profile) { - counterMaps = Lists.newArrayList(); - collectCounters(profile, counterMaps); - } - - private void collectCounters(RuntimeProfile profile, - List> counterMaps) { - for (Map.Entry entry : profile.getChildMap().entrySet()) { - counterMaps.add(entry.getValue().getCounterMap()); - collectCounters(entry.getValue(), counterMaps); - } - } - - public long getScanBytes() { - long scanBytes = 0; - for (Map counters : counterMaps) { - final Counter counter = counters.get("CompressedBytesRead"); - scanBytes += counter == null ? 0 : counter.getValue(); - } - return scanBytes; - } - - public long getRowsReturned() { - long rowsReturned = 0; - for (Map counters : counterMaps) { - final Counter counter = counters.get("RowsReturned"); - rowsReturned += counter == null ? 0 : counter.getValue(); - } - return rowsReturned; - } - } - - public static class InstanceStatistics { - private final String fragmentId; - private final TUniqueId instanceId; - private final TNetworkAddress address; - private final QueryStatistics statistics; - - public InstanceStatistics( - String fragmentId, - TUniqueId instanceId, - TNetworkAddress address, - RuntimeProfile profile) { - this.fragmentId = fragmentId; - this.instanceId = instanceId; - this.address = address; - this.statistics = new QueryStatistics(profile); - } - - public String getFragmentId() { - return fragmentId; - } - - public TUniqueId getInstanceId() { - return instanceId; - } - - public TNetworkAddress getAddress() { - return address; - } - - public long getRowsReturned() { - return statistics.getRowsReturned(); - } - - public long getScanBytes() { - return statistics.getScanBytes(); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java deleted file mode 100644 index 2d21b656ef3ff9..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQuerySqlProcDir.java +++ /dev/null @@ -1,70 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.qe.QueryStatisticsItem; - -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import java.util.List; - -/* - * show proc "/current_queries/{query_id}" - */ -public class CurrentQuerySqlProcDir implements ProcDirInterface { - - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("Sql").build(); - - private final QueryStatisticsItem item; - - public CurrentQuerySqlProcDir(QueryStatisticsItem item) { - this.item = item; - } - - @Override - public boolean register(String name, ProcNodeInterface node) { - return false; - } - - @Override - public ProcNodeInterface lookup(String name) throws AnalysisException { - if (Strings.isNullOrEmpty(name)) { - return null; - } - - if (!name.equals("fragments")) { - throw new AnalysisException(name + " doesn't exist."); - } - - return new CurrentQueryFragmentProcNode(item); - } - - @Override - public ProcResult fetchResult() throws AnalysisException { - final BaseProcResult result = new BaseProcResult(); - result.setNames(TITLE_NAMES.asList()); - final List values = Lists.newArrayList(); - values.add(item.getSql()); - result.addRow(values); - return result; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java deleted file mode 100644 index 746726c90514b3..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatementsProcNode.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.qe.QueryStatisticsItem; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.commons.codec.digest.DigestUtils; - -import java.util.List; -import java.util.Map; - -/* - * show proc "/current_query_stmts" - */ -public class CurrentQueryStatementsProcNode implements ProcNodeInterface { - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User") - .add("ExecTime").add("SqlHash").add("Statement").build(); - - private static final int EXEC_TIME_INDEX = 5; - - @Override - public ProcResult fetchResult() throws AnalysisException { - final BaseProcResult result = new BaseProcResult(); - final Map statistic = - QeProcessorImpl.INSTANCE.getQueryStatistics(); - result.setNames(TITLE_NAMES.asList()); - final List> sortedRowData = Lists.newArrayList(); - - for (QueryStatisticsItem item : statistic.values()) { - final List values = Lists.newArrayList(); - values.add(item.getQueryId()); - values.add(item.getConnId()); - values.add(item.getCatalog()); - values.add(item.getDb()); - values.add(item.getUser()); - values.add(item.getQueryExecTime()); - values.add(DigestUtils.md5Hex(item.getSql())); - values.add(item.getSql()); - sortedRowData.add(values); - } - - // sort according to ExecTime - sortedRowData.sort((l1, l2) -> { - final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX)); - final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX)); - return Long.compare(execTime2, execTime1); - }); - result.setRows(sortedRowData); - return result; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index 0df36c0040f953..88a68964f82168 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -21,24 +21,33 @@ import org.apache.doris.common.util.QueryStatisticsFormatter; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryStatisticsItem; +import org.apache.doris.thrift.TQueryStatistics; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.commons.codec.digest.DigestUtils; import java.util.List; import java.util.Map; /* * show proc "/current_queries" - * only set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows". + * the statistics is same as the data in audit log. */ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { + // ProcessRows temp used for doris manager compatibility, will be implemented future. public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("QueryId").add("ConnectionId").add("Catalog").add("Database").add("User") - .add("ScanBytes").add("ProcessRows").add("ExecTime").build(); + .add("ExecTime").add("SqlHash").add("Statement") + .add("ScanRows").add("ScanBytes").add("ProcessRows").add("CpuMs") + .add("MaxPeakMemoryBytes").add("CurrentUsedMemoryBytes").add("WorkloadGroupId") + .add("ShuffleSendBytes").add("ShuffleSendRows") + .add("ScanBytesFromLocalStorage").add("ScanBytesFromRemoteStorage") + .add("SpillWriteBytesToLocalStorage").add("SpillReadBytesFromLocalStorage") + .add("BytesWriteIntoCache") + .add("TotalTasks").add("FinishedTasks").add("Progress").build(); - private static final int EXEC_TIME_INDEX = 7; + private static final int EXEC_TIME_INDEX = 5; @Override public boolean register(String name, ProcNodeInterface node) { @@ -47,15 +56,7 @@ public boolean register(String name, ProcNodeInterface node) { @Override public ProcNodeInterface lookup(String name) throws AnalysisException { - if (Strings.isNullOrEmpty(name)) { - return null; - } - final Map statistic = QeProcessorImpl.INSTANCE.getQueryStatistics(); - final QueryStatisticsItem item = statistic.get(name); - if (item == null) { - throw new AnalysisException(name + " doesn't exist."); - } - return new CurrentQuerySqlProcDir(item); + throw new AnalysisException("operation doesn't support."); } @Override @@ -65,32 +66,41 @@ public ProcResult fetchResult() throws AnalysisException { QeProcessorImpl.INSTANCE.getQueryStatistics(); result.setNames(TITLE_NAMES.asList()); final List> sortedRowData = Lists.newArrayList(); - - final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); - final Map statisticsMap - = provider.getQueryStatistics(statistic.values()); for (QueryStatisticsItem item : statistic.values()) { final List values = Lists.newArrayList(); + final TQueryStatistics queryStatistics = item.getQueryStatistics(); values.add(item.getQueryId()); values.add(item.getConnId()); values.add(item.getCatalog()); values.add(item.getDb()); values.add(item.getUser()); - if (item.getIsReportSucc()) { - final CurrentQueryInfoProvider.QueryStatistics statistics - = statisticsMap.get(item.getQueryId()); - values.add(QueryStatisticsFormatter.getScanBytes( - statistics.getScanBytes())); - values.add(QueryStatisticsFormatter.getRowsReturned( - statistics.getRowsReturned())); - } else { - values.add("N/A"); - values.add("N/A"); - } values.add(item.getQueryExecTime()); + values.add(DigestUtils.md5Hex(item.getSql())); + values.add(item.getSql()); + values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getScanRows())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytes())); + values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getProcessRows())); + values.add(String.valueOf(queryStatistics.getCpuMs())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getMaxPeakMemoryBytes())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getCurrentUsedMemoryBytes())); + values.add(String.valueOf(queryStatistics.getWorkloadGroupId())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getShuffleSendBytes())); + values.add(QueryStatisticsFormatter.getRowsReturned(queryStatistics.getShuffleSendRows())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromLocalStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getScanBytesFromRemoteStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillWriteBytesToLocalStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getSpillReadBytesFromLocalStorage())); + values.add(QueryStatisticsFormatter.getScanBytes(queryStatistics.getBytesWriteIntoCache())); + + long total = queryStatistics.isSetTotalTasksNum() ? queryStatistics.getTotalTasksNum() : 0; + long finished = queryStatistics.isSetFinishedTasksNum() ? queryStatistics.getFinishedTasksNum() : 0; + values.add(String.valueOf(total)); + values.add(String.valueOf(finished)); + values.add(formatProgress(total, finished)); + sortedRowData.add(values); } - // sort according to ExecTime + sortedRowData.sort((l1, l2) -> { final long execTime1 = Long.parseLong(l1.get(EXEC_TIME_INDEX)); final long execTime2 = Long.parseLong(l2.get(EXEC_TIME_INDEX)); @@ -99,4 +109,16 @@ public ProcResult fetchResult() throws AnalysisException { result.setRows(sortedRowData); return result; } + + /** + * Format task progress as a percentage string with one decimal place. + * Visible for testing. + */ + static String formatProgress(long total, long finished) { + if (total > 0) { + double pct = (double) finished * 100 / total; + return String.format("%.1f%%", pct); + } + return "0.0%"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java index 42010ccbd204ae..a1f54901bde833 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java @@ -49,7 +49,7 @@ private ProcService() { root.register("trash", new TrashProcDir()); root.register("monitor", new MonitorProcDir()); root.register("current_queries", new CurrentQueryStatisticsProcDir()); - root.register("current_query_stmts", new CurrentQueryStatementsProcNode()); + root.register("current_query_stmts", new CurrentQueryStatisticsProcDir()); root.register("current_backend_instances", new CurrentQueryBackendInstanceProcDir()); root.register("cluster_balance", new ClusterBalanceProcDir()); root.register("cluster_health", new ClusterHealthProcDir()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java index 2f499a8dd4cc66..cb55f350f07c87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java @@ -55,11 +55,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -/** - * It is accessed by two kinds of thread, one is to create this RuntimeProfile - * , named 'query thread', the other is to call - * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}. - */ public class RuntimeProfile { // TODO: 这里维护性太差了 // BE 上的 OperatorXBase::init 里面有 Operator 的命名规则 diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 5480858efbf0e0..e619a81326e985 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -23,7 +23,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.Pair; import org.apache.doris.common.Status; -import org.apache.doris.common.proc.CurrentQueryStatementsProcNode; +import org.apache.doris.common.proc.CurrentQueryStatisticsProcDir; import org.apache.doris.common.proc.ProcResult; import org.apache.doris.common.profile.ProfileManager; import org.apache.doris.common.profile.ProfileManager.ProfileElement; @@ -450,7 +450,7 @@ private void checkAuthByUserAndQueryId(String queryId) throws AuthenticationExce } /** - * return the result of CurrentQueryStatementsProcNode. + * return the result of CurrentQueryStatisticsProcDir. * * @param request * @param response @@ -480,15 +480,15 @@ public Object currentQueries(HttpServletRequest request, HttpServletResponse res LOG.warn("parse query info error: {}", data, e); } } - List titles = Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES); + List titles = Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES); titles.add(0, FRONTEND); return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles, queries)); } else { try { - CurrentQueryStatementsProcNode node = new CurrentQueryStatementsProcNode(); + CurrentQueryStatisticsProcDir node = new CurrentQueryStatisticsProcDir(); ProcResult result = node.fetchResult(); // add frontend info at first column. - List titles = Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES); + List titles = Lists.newArrayList(CurrentQueryStatisticsProcDir.TITLE_NAMES); titles.add(0, FRONTEND); List> rows = result.getRows(); String feIp = FrontendOptions.getLocalHostAddress(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 275dc234706f4c..ff023aeb9394de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -29,6 +29,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryProfile; +import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TStatus; @@ -212,6 +213,8 @@ public void unregisterQuery(TUniqueId queryId) { @Override public Map getQueryStatistics() { final Map querySet = Maps.newHashMap(); + final Map queryStatisticsMap = + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().getQueryStatisticsMap(); for (Map.Entry entry : coordinatorMap.entrySet()) { final QueryInfo info = entry.getValue(); final ConnectContext context = info.getConnectContext(); @@ -219,12 +222,14 @@ public Map getQueryStatistics() { continue; } final String queryIdStr = DebugUtil.printId(info.getConnectContext().queryId()); + final TQueryStatistics queryStatistics = queryStatisticsMap.get(queryIdStr); final QueryStatisticsItem item = new QueryStatisticsItem.Builder().queryId(queryIdStr) .queryStartTime(info.getStartExecTime()).sql(info.getSql()).user(context.getQualifiedUser()) .connId(String.valueOf(context.getConnectionId())).db(context.getDatabase()) .catalog(context.getDefaultCatalog()) .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) .profile(info.getCoord().getExecutionProfile().getRoot()) + .queryStatistics(queryStatistics) .isReportSucc(context.getSessionVariable().enableProfile()).build(); querySet.put(queryIdStr, item); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java index c51ff24ca14f26..f879903e65a17c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java @@ -19,6 +19,7 @@ import org.apache.doris.common.profile.RuntimeProfile; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; @@ -38,6 +39,8 @@ public final class QueryStatisticsItem { // root query profile private final RuntimeProfile queryProfile; private final boolean isReportSucc; + // query statistics same as statistics in audit log + private final TQueryStatistics queryStatistics; private QueryStatisticsItem(Builder builder) { this.queryId = builder.queryId; @@ -50,6 +53,7 @@ private QueryStatisticsItem(Builder builder) { this.fragmentInstanceInfos = builder.fragmentInstanceInfos; this.queryProfile = builder.queryProfile; this.isReportSucc = builder.isReportSucc; + this.queryStatistics = builder.queryStatistics; } public String getDb() { @@ -97,6 +101,10 @@ public boolean getIsReportSucc() { return isReportSucc; } + public TQueryStatistics getQueryStatistics() { + return queryStatistics; + } + public static final class Builder { private String queryId; private String catalog; @@ -108,6 +116,7 @@ public static final class Builder { private List fragmentInstanceInfos; private RuntimeProfile queryProfile; private boolean isReportSucc; + private TQueryStatistics queryStatistics; public Builder() { fragmentInstanceInfos = Lists.newArrayList(); @@ -163,6 +172,11 @@ public Builder isReportSucc(boolean isReportSucc) { return this; } + public Builder queryStatistics(TQueryStatistics queryStatistics) { + this.queryStatistics = queryStatistics; + return this; + } + public QueryStatisticsItem build() { initDefaultValue(this); return new QueryStatisticsItem(this); @@ -192,6 +206,10 @@ private void initDefaultValue(Builder builder) { if (queryProfile == null) { queryProfile = new RuntimeProfile(""); } + + if (queryStatistics == null) { + queryStatistics = new TQueryStatistics(); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index cc31e7858e0852..7920b1e59a2e0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -20,12 +20,15 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TQueryStatistics; import org.apache.doris.thrift.TQueryStatisticsResult; import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -48,7 +51,10 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); + // backend id --> {query id --> (query last report time, query stats)} private Map beToQueryStatsMap = Maps.newConcurrentMap(); + // Publish an immutable snapshot for synchronous proc/REST readers. + private volatile Map queryStatisticsSnapshot = ImmutableMap.of(); private final ReentrantLock queryAuditEventLock = new ReentrantLock(); private List queryAuditEventList = Lists.newLinkedList(); private volatile long lastWarnTime; @@ -60,6 +66,7 @@ private class BeReportInfo { this.beLastReportTime = beLastReportTime; } + // query id --> (query last report time, query stats) Map> queryStatsMap = Maps.newConcurrentMap(); } @@ -69,10 +76,12 @@ public WorkloadRuntimeStatusMgr() { @Override protected void runAfterCatalogReady() { - // 1 merge be query statistics + // 1 rebuild and publish query statistics snapshot + rebuildQueryStatisticsSnapshot(); + // 2 read the latest immutable snapshot for downstream processing Map queryStatisticsMap = getQueryStatisticsMap(); - // 2 log query audit + // 3 log query audit try { List auditEventList = getQueryNeedAudit(); int missedLogCount = 0; @@ -106,10 +115,17 @@ protected void runAfterCatalogReady() { LOG.warn("exception happens when handleAuditEvent, ", t); } - // 3 clear beToQueryStatsMap when be report timeout + // 4 clear beToQueryStatsMap when be report timeout clearReportTimeoutBeStatistics(); } + // After the query or insert finished, FE will not audit immediately, it will send an audit + // event to this queue. And the worker thread will handle it. If the queue is full, the event + // will be handled immediately and may miss some statistic info. So the statistic info of audit + // event may be not accurate, but it can avoid the case that FE OOM because of too many audit + // events in queue when QPS is high. The event will be logged directly if the queue is full. + // And the worker thread will get an event from the queue and get the statistic info for this + // event from queryStatisticsMap. public void submitFinishQueryToAudit(AuditEvent event) { queryAuditEventLogWriteLock(); try { @@ -121,9 +137,9 @@ public void submitFinishQueryToAudit(AuditEvent event) { // if queryAuditEventList is full, we don't put the event to queryAuditEventList. // so that the statistic info of this audit event will be ignored, // and event will be logged directly. - LOG.warn("audit log event queue size {} is full, this may cause audit log missing statistics." - + "you can check whether qps is too high or " - + "set audit_event_log_queue_size to a larger value in fe.conf. query id: {}", + LOG.warn("audit log event queue size {} is full, this may cause audit log missing " + + "statistics. you can check whether qps is too high or set " + + "audit_event_log_queue_size to a larger value in fe.conf. query id: {}", queryAuditEventList.size(), event.queryId); } Env.getCurrentAuditEventProcessor().handleAuditEvent(event); @@ -186,7 +202,7 @@ public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) { } } - void clearReportTimeoutBeStatistics() { + private void clearReportTimeoutBeStatistics() { // 1 clear report timeout be Set currentBeIdSet = beToQueryStatsMap.keySet(); Long currentTime = System.currentTimeMillis(); @@ -200,24 +216,59 @@ void clearReportTimeoutBeStatistics() { for (String queryId : queryIdSet) { Pair pair = beReportInfo.queryStatsMap.get(queryId); long queryLastReportTime = pair.first; - if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) { + boolean timeout = currentTime - queryLastReportTime + > Config.be_report_query_statistics_timeout_ms; + // Remove query statistics only when both conditions are satisfied: + // 1) this query statistics is timeout, and + // 2) FE no longer has this query in QeProcessorImpl. + // Example timeline: + // - t0: query q1 is still running, but one periodic BE report is delayed for > timeout. + // - t1: clear thread runs. timeout condition is true, but q1 still exists in FE. + // - t2: we keep q1 statistics instead of removing it; later reports can update it again. + if (timeout && isQueryNotExistInFe(queryId)) { beReportInfo.queryStatsMap.remove(queryId); } } } } - // NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap - // so there is no need lock or null check when visit beToQueryStatsMap + private boolean isQueryNotExistInFe(String queryId) { + try { + return QeProcessorImpl.INSTANCE.getCoordinator(DebugUtil.parseTUniqueIdFromString(queryId)) == null; + } catch (NumberFormatException e) { + return true; + } + } + + // Rebuild query statistics from concurrent runtime maps and publish an immutable snapshot. + // This method is intentionally called by daemon thread and unit tests only. + void rebuildQueryStatisticsSnapshot() { + queryStatisticsSnapshot = ImmutableMap.copyOf(buildQueryStatisticsMapUnsafe()); + } + + // Return the latest published snapshot for synchronous readers such as proc/REST paths. public Map getQueryStatisticsMap() { + return queryStatisticsSnapshot; + } + + // Build a merged map by traversing concurrent runtime structures. + private Map buildQueryStatisticsMapUnsafe() { // 1 merge query stats in all be Set beIdSet = beToQueryStatsMap.keySet(); Map resultQueryMap = Maps.newHashMap(); for (Long beId : beIdSet) { BeReportInfo beReportInfo = beToQueryStatsMap.get(beId); + if (beReportInfo == null) { + continue; + } Set queryIdSet = beReportInfo.queryStatsMap.keySet(); for (String queryId : queryIdSet) { - TQueryStatisticsResult curQueryStats = beReportInfo.queryStatsMap.get(queryId).second; + Pair queryStatsPair = + beReportInfo.queryStatsMap.get(queryId); + if (queryStatsPair == null || queryStatsPair.second == null) { + continue; + } + TQueryStatisticsResult curQueryStats = queryStatsPair.second; TQueryStatistics retQuery = resultQueryMap.get(queryId); if (retQuery == null) { @@ -248,18 +299,37 @@ private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatisticsResult s if (srcStats == null) { return; } - dst.scan_rows += srcStats.scan_rows; - dst.scan_bytes += srcStats.scan_bytes; - dst.scan_bytes_from_local_storage += srcStats.scan_bytes_from_local_storage; - dst.scan_bytes_from_remote_storage += srcStats.scan_bytes_from_remote_storage; - dst.cpu_ms += srcStats.cpu_ms; - dst.shuffle_send_bytes += srcStats.shuffle_send_bytes; - dst.shuffle_send_rows += srcStats.shuffle_send_rows; + dst.setScanRows(dst.scan_rows + srcStats.scan_rows); + dst.setScanBytes(dst.scan_bytes + srcStats.scan_bytes); + dst.setScanBytesFromLocalStorage(dst.scan_bytes_from_local_storage + + srcStats.scan_bytes_from_local_storage); + dst.setScanBytesFromRemoteStorage(dst.scan_bytes_from_remote_storage + + srcStats.scan_bytes_from_remote_storage); + dst.setCpuMs(dst.cpu_ms + srcStats.cpu_ms); + dst.setShuffleSendBytes(dst.shuffle_send_bytes + srcStats.shuffle_send_bytes); + dst.setShuffleSendRows(dst.shuffle_send_rows + srcStats.shuffle_send_rows); + dst.setProcessRows(dst.process_rows + srcStats.process_rows); + dst.setReturnedRows(dst.returned_rows + srcStats.returned_rows); + if (srcStats.isSetTotalTasksNum()) { + dst.setTotalTasksNum(dst.total_tasks_num + srcStats.total_tasks_num); + } + if (srcStats.isSetFinishedTasksNum()) { + dst.setFinishedTasksNum(dst.finished_tasks_num + srcStats.finished_tasks_num); + } + if (dst.current_used_memory_bytes < srcStats.current_used_memory_bytes) { + dst.setCurrentUsedMemoryBytes(srcStats.current_used_memory_bytes); + } + if (dst.workload_group_id <= 0 && srcStats.workload_group_id > 0) { + dst.setWorkloadGroupId(srcStats.workload_group_id); + } if (dst.max_peak_memory_bytes < srcStats.max_peak_memory_bytes) { - dst.max_peak_memory_bytes = srcStats.max_peak_memory_bytes; + dst.setMaxPeakMemoryBytes(srcStats.max_peak_memory_bytes); } - dst.spill_write_bytes_to_local_storage += srcStats.spill_write_bytes_to_local_storage; - dst.spill_read_bytes_from_local_storage += srcStats.spill_read_bytes_from_local_storage; + dst.setSpillWriteBytesToLocalStorage(dst.spill_write_bytes_to_local_storage + + srcStats.spill_write_bytes_to_local_storage); + dst.setSpillReadBytesFromLocalStorage(dst.spill_read_bytes_from_local_storage + + srcStats.spill_read_bytes_from_local_storage); + dst.setBytesWriteIntoCache(dst.bytes_write_into_cache + srcStats.bytes_write_into_cache); } private void queryAuditEventLogWriteLock() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java new file mode 100644 index 00000000000000..0c9accd89e55f9 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDirTest.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit test for CurrentQueryStatisticsProcDir progress formatting. + * Tests the formatProgress method directly — no mocking or reflection required. + */ +public class CurrentQueryStatisticsProcDirTest { + + @Test + public void testProgressNormal() { + // 7 out of 20 tasks finished = 35.0% + Assert.assertEquals("35.0%", CurrentQueryStatisticsProcDir.formatProgress(20, 7)); + } + + @Test + public void testProgressAllFinished() { + // 8 out of 8 = 100.0% + Assert.assertEquals("100.0%", CurrentQueryStatisticsProcDir.formatProgress(8, 8)); + } + + @Test + public void testProgressOneThird() { + // 1 out of 3 = 33.3% + Assert.assertEquals("33.3%", CurrentQueryStatisticsProcDir.formatProgress(3, 1)); + } + + @Test + public void testProgressTwoThirds() { + // 2 out of 3 = 66.7% + Assert.assertEquals("66.7%", CurrentQueryStatisticsProcDir.formatProgress(3, 2)); + } + + @Test + public void testProgressZeroPercent() { + // 0 out of 5 = 0.0% + Assert.assertEquals("0.0%", CurrentQueryStatisticsProcDir.formatProgress(5, 0)); + } + + @Test + public void testProgressZeroTotal() { + // total = 0, finished = 0 → "0.0%" (no division by zero) + Assert.assertEquals("0.0%", CurrentQueryStatisticsProcDir.formatProgress(0, 0)); + } + + @Test + public void testProgressFinishedExceedsTotal() { + // Defensive: if finished > total, still returns a percentage (may exceed 100%) + Assert.assertEquals("200.0%", CurrentQueryStatisticsProcDir.formatProgress(5, 10)); + } + + @Test + public void testProgressNegativeTotal() { + // total < 0 → returns "0.0%" + Assert.assertEquals("0.0%", CurrentQueryStatisticsProcDir.formatProgress(-1, 5)); + } + + @Test + public void testProgressLargeValues() { + // Verify no overflow with large numbers + Assert.assertEquals("50.0%", + CurrentQueryStatisticsProcDir.formatProgress(Integer.MAX_VALUE, Integer.MAX_VALUE / 2)); + } + + @Test + public void testProgressFractional() { + // 1 out of 7 = 14.3% (14.2857... rounds to 14.3) + Assert.assertEquals("14.3%", CurrentQueryStatisticsProcDir.formatProgress(7, 1)); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java new file mode 100644 index 00000000000000..ae17dcc430a52f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgrTest.java @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.thrift.TQueryStatistics; +import org.apache.doris.thrift.TQueryStatisticsResult; +import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams; + +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +/** + * Unit test for WorkloadRuntimeStatusMgr. + * Verifies that query progress statistics from multiple BEs are correctly + * merged via the Thrift setter-based merge logic. + */ +public class WorkloadRuntimeStatusMgrTest { + + private WorkloadRuntimeStatusMgr mgr; + + @Before + public void setUp() { + mgr = new WorkloadRuntimeStatusMgr(); + } + + // ---- Merge: single BE ---- + + @Test + public void testSingleBeProgressMerge() { + // Simulate one BE reporting progress for one query. + long beId = 10001L; + TQueryStatistics stats = new TQueryStatistics(); + stats.setTotalTasksNum(10); + stats.setFinishedTasksNum(3); + + TReportWorkloadRuntimeStatusParams params = buildParams(beId, "q1", stats); + mgr.updateBeQueryStats(params); + + Map merged = getMergedSnapshot(); + Assert.assertEquals(1, merged.size()); + + TQueryStatistics result = merged.get("q1"); + Assert.assertNotNull(result); + Assert.assertEquals(10, result.getTotalTasksNum()); + Assert.assertEquals(3, result.getFinishedTasksNum()); + } + + // ---- Merge: multiple BEs, same query (summing across BEs) ---- + + @Test + public void testMultiBeSummingAcrossQuery() { + // BE1: total=10, finished=3 + // BE2: total=8, finished=5 + // Merged: total=18, finished=8 + mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 3))); + mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(8, 5))); + + Map merged = getMergedSnapshot(); + Assert.assertEquals(1, merged.size()); + + TQueryStatistics result = merged.get("q1"); + Assert.assertEquals(18, result.getTotalTasksNum()); + Assert.assertEquals(8, result.getFinishedTasksNum()); + } + + // ---- Merge: multiple BEs, multiple queries remain independent ---- + + @Test + public void testMultiQueryIndependence() { + mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 2))); + mgr.updateBeQueryStats(buildParams(10001L, "q2", buildStats(20, 15))); + + Map merged = getMergedSnapshot(); + Assert.assertEquals(2, merged.size()); + + Assert.assertEquals(10, merged.get("q1").getTotalTasksNum()); + Assert.assertEquals(2, merged.get("q1").getFinishedTasksNum()); + Assert.assertEquals(20, merged.get("q2").getTotalTasksNum()); + Assert.assertEquals(15, merged.get("q2").getFinishedTasksNum()); + } + + // ---- isSet flag: unset fields should not override previous values ---- + + @Test + public void testIsSetPreservesPreviousValues() { + // BE1 reports total=10, finished=3 with isSet properly set + mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 3))); + + // BE2 reports same query but does NOT set total_tasks_num + // This simulates an older BE version without progress support. + TQueryStatistics stats2 = new TQueryStatistics(); + // Intentionally NOT calling setTotalTasksNum/setFinishedTasksNum + // (isSet* returns false) + stats2.setScanRows(100); // set some other field to make it non-empty + mgr.updateBeQueryStats(buildParams(10002L, "q1", stats2)); + + Map merged = getMergedSnapshot(); + TQueryStatistics result = merged.get("q1"); + + // BE2 didn't set total/finished, so original values from BE1 should be preserved + Assert.assertEquals(10, result.getTotalTasksNum()); + Assert.assertEquals(3, result.getFinishedTasksNum()); + } + + // ---- Zero-reporting BE should not interfere ---- + + @Test + public void testBeWithZeroProgress() { + mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(10, 4))); + + // BE2 reports zero progress correctly + mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(0, 0))); + + Map merged = getMergedSnapshot(); + TQueryStatistics result = merged.get("q1"); + + // total=10, finished=4 (from BE1); BE2's (0,0) is additive → still (10,4) + Assert.assertEquals(10, result.getTotalTasksNum()); + Assert.assertEquals(4, result.getFinishedTasksNum()); + } + + // ---- getQueryStatistics returns per-BE map ---- + + @Test + public void testGetQueryStatisticsPerBe() { + mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(5, 2))); + mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(3, 1))); + + Map perBe = mgr.getQueryStatistics("q1"); + Assert.assertEquals(2, perBe.size()); + Assert.assertTrue(perBe.containsKey(10001L)); + Assert.assertTrue(perBe.containsKey(10002L)); + Assert.assertEquals(5, perBe.get(10001L).getStatistics().getTotalTasksNum()); + Assert.assertEquals(3, perBe.get(10002L).getStatistics().getTotalTasksNum()); + } + + // ---- Non-existent query returns empty map ---- + + @Test + public void testGetQueryStatisticsNonExistent() { + Map perBe = mgr.getQueryStatistics("non-existent-query"); + Assert.assertTrue(perBe.isEmpty()); + } + + // ---- updateBeQueryStats with missing fields ---- + + @Test + public void testUpdateBeQueryStatsMissingBackendId() { + TReportWorkloadRuntimeStatusParams params = new TReportWorkloadRuntimeStatusParams(); + // backend_id not set, updateBeQueryStats should log a warning and return early + mgr.updateBeQueryStats(params); + Assert.assertTrue(getMergedSnapshot().isEmpty()); + } + + // ---- updateBeQueryStats with missing query stats map ---- + + @Test + public void testUpdateBeQueryStatsMissingQueryStatsMap() { + TReportWorkloadRuntimeStatusParams params = new TReportWorkloadRuntimeStatusParams(); + params.setBackendId(10001L); + // query_statistics_result_map not set → should return early + mgr.updateBeQueryStats(params); + Assert.assertTrue(getMergedSnapshot().isEmpty()); + } + + // ---- isSet flag: verifying Thrift setter behavior inline ---- + + @Test + public void testThriftIsSetFlagRequired() { + // Confirm that using setter (via __set*) sets the __isset flag, + // whereas direct field assignment does not. + // This documents the historical bug that was fixed in this feature. + + TQueryStatistics viaSetter = new TQueryStatistics(); + viaSetter.setTotalTasksNum(5); + Assert.assertTrue("setTotalTasksNum via setter must set __isset flag", + viaSetter.isSetTotalTasksNum()); + + TQueryStatistics viaField = new TQueryStatistics(); + viaField.total_tasks_num = 5; // direct field assignment + Assert.assertFalse("direct field assignment must NOT set __isset flag", + viaField.isSetTotalTasksNum()); + + // Same for finished_tasks_num + viaSetter.setFinishedTasksNum(3); + Assert.assertTrue(viaSetter.isSetFinishedTasksNum()); + + viaField.finished_tasks_num = 3; + Assert.assertFalse(viaField.isSetFinishedTasksNum()); + } + + // ---- Merge without any progress fields ---- + + @Test + public void testMergeWithoutProgressFields() { + // Regression test: when isSet is false for progress fields, + // merge should not touch them, leaving them at default (0). + TQueryStatistics stats = new TQueryStatistics(); + // Intentionally leave total/finished unset + mgr.updateBeQueryStats(buildParams(10001L, "q1", stats)); + + Map merged = getMergedSnapshot(); + TQueryStatistics result = merged.get("q1"); + + // Fields should still be 0 and isSet should be false + Assert.assertEquals(0, result.getTotalTasksNum()); + Assert.assertEquals(0, result.getFinishedTasksNum()); + } + + // ---- Merge: three BEs combined ---- + + @Test + public void testThreeBeMergeProgress() { + mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(4, 1))); + mgr.updateBeQueryStats(buildParams(10002L, "q1", buildStats(3, 3))); + mgr.updateBeQueryStats(buildParams(10003L, "q1", buildStats(5, 0))); + + Map merged = getMergedSnapshot(); + Assert.assertEquals(1, merged.size()); + + TQueryStatistics result = merged.get("q1"); + // total = 4 + 3 + 5 = 12, finished = 1 + 3 + 0 = 4 + Assert.assertEquals(12, result.getTotalTasksNum()); + Assert.assertEquals(4, result.getFinishedTasksNum()); + } + + @Test + public void testSnapshotReadRequiresRebuild() { + mgr.updateBeQueryStats(buildParams(10001L, "q1", buildStats(6, 2))); + // Newly reported data is not visible to sync readers before snapshot rebuild. + Assert.assertTrue(mgr.getQueryStatisticsMap().isEmpty()); + + // Rebuild snapshot and verify the new data becomes visible. + Map merged = getMergedSnapshot(); + Assert.assertEquals(1, merged.size()); + Assert.assertEquals(6, merged.get("q1").getTotalTasksNum()); + Assert.assertEquals(2, merged.get("q1").getFinishedTasksNum()); + } + + // ---- helper methods ---- + + private TQueryStatistics buildStats(int totalTasks, int finishedTasks) { + TQueryStatistics stats = new TQueryStatistics(); + stats.setTotalTasksNum(totalTasks); + stats.setFinishedTasksNum(finishedTasks); + return stats; + } + + private TReportWorkloadRuntimeStatusParams buildParams(long beId, String queryId, TQueryStatistics stats) { + TQueryStatisticsResult result = new TQueryStatisticsResult(); + result.setStatistics(stats); + + TReportWorkloadRuntimeStatusParams params = new TReportWorkloadRuntimeStatusParams(); + params.setBackendId(beId); + Map map = Maps.newHashMap(); + map.put(queryId, result); + params.setQueryStatisticsResultMap(map); + return params; + } + + // Refresh and read snapshot to match daemon-driven visibility semantics. + private Map getMergedSnapshot() { + mgr.rebuildQueryStatisticsSnapshot(); + return mgr.getQueryStatisticsMap(); + } +} diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 3b1c55fc295d16..52246428345070 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -202,6 +202,9 @@ struct TQueryStatistics { 12: optional i64 spill_write_bytes_to_local_storage 13: optional i64 spill_read_bytes_from_local_storage 14: optional i64 bytes_write_into_cache + 15: optional i64 process_rows + 16: optional i32 finished_tasks_num + 17: optional i32 total_tasks_num } struct TQueryStatisticsResult {