Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/operator/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ void PipelineXLocalStateBase::reached_limit(Block* block, bool* eos) {

if (auto rows = block->rows()) {
_num_rows_returned += rows;
_state->get_query_ctx()->resource_ctx()->io_context()->update_process_rows(rows);
}
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
}
_pipeline_parent_map.clear();
_op_id_to_shared_state.clear();
// Record task cardinality once when this fragment context finishes task initialization.
_query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));

return Status::OK();
}
Expand Down Expand Up @@ -1934,6 +1936,8 @@ void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
{
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
// Update query-level finished task progress in real time.
_query_ctx->inc_finished_task_num();
if (_closed_tasks >= _total_tasks) {
need_remove = _close_fragment_instance();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "runtime/runtime_state.h"
#include "runtime/task_execution_context.h"
#include "util/stopwatch.hpp"
#include "util/uid_util.h"

namespace doris {
struct ReportStatusRequest;
Expand Down Expand Up @@ -88,11 +89,11 @@ class PipelineFragmentContext : public TaskExecutionContext {

[[nodiscard]] int get_fragment_id() const { return _fragment_id; }

void decrement_running_task(PipelineId pipeline_id);

uint32_t rec_cte_stage() const { return _rec_cte_stage; }
void set_rec_cte_stage(uint32_t stage) { _rec_cte_stage = stage; }

void decrement_running_task(PipelineId pipeline_id);

Status send_report(bool);

void trigger_report_if_necessary();
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,4 +555,16 @@ Status QueryContext::reset_global_rf(const google::protobuf::RepeatedField<int32
return Status::OK();
}

void QueryContext::add_total_task_num(int delta) {
if (auto* qtc = dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
qtc->add_total_task_num(delta);
}
}

void QueryContext::inc_finished_task_num() {
if (auto* qtc = dynamic_cast<QueryTaskController*>(_resource_ctx->task_controller())) {
qtc->inc_finished_task_num();
}
}

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace doris {

class PipelineFragmentContext;
class PipelineTask;
class QueryTaskController;
class Dependency;
class RecCTEScanLocalState;

Expand Down Expand Up @@ -203,6 +204,10 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

TUniqueId query_id() const { return _query_id; }

// Expose task-level query progress counters for runtime statistics reporting.
void add_total_task_num(int delta);
void inc_finished_task_num();

ScannerScheduler* get_scan_scheduler() { return _scan_task_scheduler; }

ScannerScheduler* get_remote_scan_scheduler() { return _remote_scan_task_scheduler; }
Expand Down Expand Up @@ -311,6 +316,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids);

private:
// Task-level progress counters for current query.
friend class QueryTaskController;

int _timeout_second;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/workload_management/io_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
// number rows returned by query.
// only set once by result sink when closing.
RuntimeProfile::Counter* returned_rows_counter_;
RuntimeProfile::Counter* process_rows_counter_;
RuntimeProfile::Counter* shuffle_send_bytes_counter_;
RuntimeProfile::Counter* shuffle_send_rows_counter_;

Expand All @@ -63,6 +64,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
bytes_write_into_cache_counter_ =
ADD_COUNTER(profile_, "BytesWriteIntoCache", TUnit::BYTES);
returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT);
process_rows_counter_ = ADD_COUNTER(profile_, "ProcessRows", TUnit::UNIT);
shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES);
shuffle_send_rows_counter_ =
ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT);
Expand Down Expand Up @@ -94,6 +96,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
return stats_.bytes_write_into_cache_counter_->value();
}
int64_t returned_rows() const { return stats_.returned_rows_counter_->value(); }
int64_t process_rows() const { return stats_.process_rows_counter_->value(); }
int64_t shuffle_send_bytes() const { return stats_.shuffle_send_bytes_counter_->value(); }
int64_t shuffle_send_rows() const { return stats_.shuffle_send_rows_counter_->value(); }

Expand All @@ -117,6 +120,7 @@ class IOContext : public std::enable_shared_from_this<IOContext> {
stats_.bytes_write_into_cache_counter_->update(delta);
}
void update_returned_rows(int64_t delta) const { stats_.returned_rows_counter_->update(delta); }
void update_process_rows(int64_t delta) const { stats_.process_rows_counter_->update(delta); }
void update_shuffle_send_bytes(int64_t delta) const {
stats_.shuffle_send_bytes_counter_->update(delta);
}
Expand Down
18 changes: 18 additions & 0 deletions be/src/runtime/workload_management/query_task_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,23 @@ std::vector<PipelineTask*> QueryTaskController::get_revocable_tasks() {
return tasks;
}

void QueryTaskController::add_total_task_num(int delta) {
_total_task_num.fetch_add(delta, std::memory_order_relaxed);
}

void QueryTaskController::inc_finished_task_num() {
_finished_task_num.fetch_add(1, std::memory_order_relaxed);
}

int QueryTaskController::get_total_task_num() const {
// Read from controller-owned counters to avoid lifecycle dependency on QueryContext.
return _total_task_num.load(std::memory_order_relaxed);
}

int QueryTaskController::get_finished_task_num() const {
// Read from controller-owned counters to avoid lifecycle dependency on QueryContext.
return _finished_task_num.load(std::memory_order_relaxed);
}

#include "common/compile_check_end.h"
} // namespace doris
10 changes: 10 additions & 0 deletions be/src/runtime/workload_management/query_task_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <atomic>

#include "common/factory_creator.h"
#include "runtime/workload_management/task_controller.h"

Expand Down Expand Up @@ -46,11 +48,19 @@ class QueryTaskController : public TaskController {
size_t get_revocable_size() override;
Status revoke_memory() override;
std::vector<PipelineTask*> get_revocable_tasks() override;
// Expose task progress counters without leaking full QueryContext.
void add_total_task_num(int delta);
void inc_finished_task_num();
int get_total_task_num() const;
int get_finished_task_num() const;

protected:
QueryTaskController(const std::shared_ptr<QueryContext>& query_ctx) : query_ctx_(query_ctx) {}

const std::weak_ptr<QueryContext> query_ctx_;
// Keep task progress counters in controller so they outlive QueryContext if needed.
std::atomic<int> _total_task_num {0};
std::atomic<int> _finished_task_num {0};
};

#include "common/compile_check_end.h"
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/workload_management/resource_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/data.pb.h>
#include <glog/logging.h>

#include "runtime/workload_management/query_task_controller.h"
#include "util/time.h"

namespace doris {
Expand All @@ -31,6 +32,7 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
statistics->__set_scan_bytes(io_context()->scan_bytes());
statistics->__set_cpu_ms(cpu_context()->cpu_cost_ms() / NANOS_PER_MILLIS);
statistics->__set_returned_rows(io_context()->returned_rows());
statistics->__set_process_rows(io_context()->process_rows());
statistics->__set_max_peak_memory_bytes(memory_context()->max_peak_memory_bytes());
statistics->__set_current_used_memory_bytes(memory_context()->current_memory_bytes());
statistics->__set_shuffle_send_bytes(io_context()->shuffle_send_bytes());
Expand All @@ -50,6 +52,12 @@ void ResourceContext::to_thrift_query_statistics(TQueryStatistics* statistics) c
io_context_->spill_write_bytes_to_local_storage());
statistics->__set_spill_read_bytes_from_local_storage(
io_context_->spill_read_bytes_from_local_storage());

if (auto* query_task_controller = dynamic_cast<QueryTaskController*>(task_controller())) {
// Fill query task-level progress directly from task controller.
statistics->__set_total_tasks_num(query_task_controller->get_total_task_num());
statistics->__set_finished_tasks_num(query_task_controller->get_finished_task_num());
}
}

#include "common/compile_check_end.h"
Expand Down
125 changes: 125 additions & 0 deletions be/test/exec/pipeline/pipeline_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/pipeline/pipeline.h"

#include <gen_cpp/FrontendService_types.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

Expand All @@ -40,6 +41,8 @@
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/workload_management/query_task_controller.h"
#include "runtime/workload_management/resource_context.h"

namespace doris {

Expand Down Expand Up @@ -467,6 +470,36 @@ TEST_F(PipelineTest, HAPPY_PATH) {
downstream_recvr->close();
}

TEST_F(PipelineTest, QueryTaskProgressCounters) {
// Verify task-level counters are updated via QueryContext and exposed by QueryTaskController.
_query_ctx->add_total_task_num(7);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();

auto* query_task_controller =
dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(query_task_controller, nullptr);
EXPECT_EQ(query_task_controller->get_total_task_num(), 7);
EXPECT_EQ(query_task_controller->get_finished_task_num(), 3);
}

TEST_F(PipelineTest, QueryTaskProgressCountersOutliveQueryContext) {
// Verify controller-owned counters still work after QueryContext is destroyed.
auto resource_ctx = _query_ctx->resource_ctx();
auto* query_task_controller =
dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
ASSERT_NE(query_task_controller, nullptr);

_query_ctx->add_total_task_num(5);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();

_query_ctx.reset();
EXPECT_EQ(query_task_controller->get_total_task_num(), 5);
EXPECT_EQ(query_task_controller->get_finished_task_num(), 2);
}

TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
_reset();
// Pipeline(ExchangeOperator(id=0, HASH_PARTITIONED) -> ExchangeSinkOperatorX(id=1, UNPARTITIONED))
Expand Down Expand Up @@ -1163,4 +1196,96 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
downstream_recvr->close();
}

TEST_F(PipelineTest, QueryTaskProgressConcurrentUpdates) {
// Verify counters are thread-safe under concurrent updates from multiple threads.
auto resource_ctx = _query_ctx->resource_ctx();
auto* ctrl = dynamic_cast<QueryTaskController*>(resource_ctx->task_controller());
ASSERT_NE(ctrl, nullptr);

_query_ctx->add_total_task_num(400);

std::vector<std::thread> threads;
for (int i = 0; i < 8; i++) {
threads.emplace_back([this]() {
for (int j = 0; j < 50; j++) {
_query_ctx->inc_finished_task_num();
}
});
}
for (auto& t : threads) {
t.join();
}

EXPECT_EQ(ctrl->get_total_task_num(), 400);
EXPECT_EQ(ctrl->get_finished_task_num(), 400);
}

TEST_F(PipelineTest, QueryTaskProgressThriftSerialization) {
// Verify progress counters are correctly serialized to Thrift struct.
_query_ctx->add_total_task_num(10);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();

TQueryStatistics tqs;
_query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);

EXPECT_TRUE(tqs.__isset.total_tasks_num);
EXPECT_EQ(tqs.total_tasks_num, 10);
EXPECT_TRUE(tqs.__isset.finished_tasks_num);
EXPECT_EQ(tqs.finished_tasks_num, 4);
}

TEST_F(PipelineTest, QueryTaskProgressBoundaryZeroTotal) {
// Verify behavior when no tasks have been registered (total = 0).
auto* ctrl = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl, nullptr);

// total = 0, finished = 0
EXPECT_EQ(ctrl->get_total_task_num(), 0);
EXPECT_EQ(ctrl->get_finished_task_num(), 0);

// inc_finished with no total should still work without crash
_query_ctx->inc_finished_task_num();
EXPECT_EQ(ctrl->get_finished_task_num(), 1);
}

TEST_F(PipelineTest, QueryTaskProgressAllFinished) {
// Verify 100% progress when all tasks finish.
_query_ctx->add_total_task_num(8);
for (int i = 0; i < 8; i++) {
_query_ctx->inc_finished_task_num();
}

auto* ctrl = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl, nullptr);
EXPECT_EQ(ctrl->get_total_task_num(), 8);
EXPECT_EQ(ctrl->get_finished_task_num(), 8);

// Verify thrift serialization
TQueryStatistics tqs;
_query_ctx->resource_ctx()->to_thrift_query_statistics(&tqs);
EXPECT_EQ(tqs.total_tasks_num, 8);
EXPECT_EQ(tqs.finished_tasks_num, 8);
}

TEST_F(PipelineTest, QueryTaskProgressCountersSurviveReset) {
// Verify that after calling _reset(), fresh counters are initialized to zero.
auto* ctrl1 = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl1, nullptr);
_query_ctx->add_total_task_num(10);
_query_ctx->inc_finished_task_num();
_query_ctx->inc_finished_task_num();
EXPECT_EQ(ctrl1->get_total_task_num(), 10);
EXPECT_EQ(ctrl1->get_finished_task_num(), 2);

// Reset creates a new QueryContext
_reset();
auto* ctrl2 = dynamic_cast<QueryTaskController*>(_query_ctx->resource_ctx()->task_controller());
ASSERT_NE(ctrl2, nullptr);
EXPECT_EQ(ctrl2->get_total_task_num(), 0);
EXPECT_EQ(ctrl2->get_finished_task_num(), 0);
}

} // namespace doris
Loading
Loading