From 6aa3cec937598b6b9c52617c7a3465ad10ce88b0 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Wed, 15 Apr 2026 09:53:00 +0800 Subject: [PATCH 1/4] [refactor](fragment mgr) move report logic to pipeline fragment context to remove callback parameter from ctor --- .../pipeline/pipeline_fragment_context.cpp | 272 ++++++++++++++++- .../exec/pipeline/pipeline_fragment_context.h | 22 +- be/src/runtime/fragment_mgr.cpp | 283 +----------------- be/src/runtime/fragment_mgr.h | 7 - 4 files changed, 272 insertions(+), 312 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index f6042fa5589330..a068b3d8048b71 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -18,6 +18,8 @@ #include "exec/pipeline/pipeline_fragment_context.h" #include +#include +#include #include #include #include @@ -26,6 +28,8 @@ #include // IWYU pragma: no_include #include +#include +#include #include // IWYU pragma: keep #include @@ -122,23 +126,24 @@ #include "runtime/result_buffer_mgr.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/client_cache.h" #include "util/countdown_latch.h" #include "util/debug_util.h" +#include "util/network_util.h" #include "util/uid_util.h" namespace doris { PipelineFragmentContext::PipelineFragmentContext( TUniqueId query_id, const TPipelineFragmentParams& request, std::shared_ptr query_ctx, ExecEnv* exec_env, - const std::function& call_back, - report_status_callback report_status_cb) + const std::function& call_back) : _query_id(std::move(query_id)), _fragment_id(request.fragment_id), _exec_env(exec_env), _query_ctx(std::move(query_ctx)), _call_back(call_back), _is_report_on_cancel(true), - _report_status_cb(std::move(report_status_cb)), _params(request), _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0), _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close @@ -1958,6 +1963,256 @@ std::string PipelineFragmentContext::get_first_error_msg() { return ""; } +std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const { + std::stringstream url; + url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port + << "/api/_download_load?" + << "token=" << _exec_env->token() << "&file=" << file_name; + return url.str(); +} + +void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) { + DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", { + int random_seconds = req.status.is() ? 8 : 2; + LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id)); + std::this_thread::sleep_for(std::chrono::seconds(random_seconds)); + LOG_INFO("sleep done").tag("query_id", print_id(req.query_id)); + }); + + DCHECK(req.status.ok() || req.done); // if !status.ok() => done + if (req.coord_addr.hostname == "external") { + // External query (flink/spark read tablets) not need to report to FE. + return; + } + int callback_retries = 10; + const int sleep_ms = 1000; + Status exec_status = req.status; + Status coord_status; + std::unique_ptr coord = nullptr; + do { + coord = std::make_unique(_exec_env->frontend_client_cache(), + req.coord_addr, &coord_status); + if (!coord_status.ok()) { + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + } + } while (!coord_status.ok() && callback_retries-- > 0); + + if (!coord_status.ok()) { + UniqueId uid(req.query_id.hi, req.query_id.lo); + static_cast(req.cancel_fn(Status::InternalError( + "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(), + PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string()))); + return; + } + + TReportExecStatusParams params; + params.protocol_version = FrontendServiceVersion::V1; + params.__set_query_id(req.query_id); + params.__set_backend_num(req.backend_num); + params.__set_fragment_instance_id(req.fragment_instance_id); + params.__set_fragment_id(req.fragment_id); + params.__set_status(exec_status.to_thrift()); + params.__set_done(req.done); + params.__set_query_type(req.runtime_state->query_type()); + params.__isset.profile = false; + + DCHECK(req.runtime_state != nullptr); + + if (req.runtime_state->query_type() == TQueryType::LOAD) { + params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); + params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); + } else { + DCHECK(!req.runtime_states.empty()); + if (!req.runtime_state->output_files().empty()) { + params.__isset.delta_urls = true; + for (auto& it : req.runtime_state->output_files()) { + params.delta_urls.push_back(_to_http_path(it)); + } + } + if (!params.delta_urls.empty()) { + params.__isset.delta_urls = true; + } + } + + static std::string s_dpp_normal_all = "dpp.norm.ALL"; + static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; + static std::string s_unselected_rows = "unselected.rows"; + int64_t num_rows_load_success = 0; + int64_t num_rows_load_filtered = 0; + int64_t num_rows_load_unselected = 0; + if (req.runtime_state->num_rows_load_total() > 0 || + req.runtime_state->num_rows_load_filtered() > 0 || + req.runtime_state->num_finished_range() > 0) { + params.__isset.load_counters = true; + + num_rows_load_success = req.runtime_state->num_rows_load_success(); + num_rows_load_filtered = req.runtime_state->num_rows_load_filtered(); + num_rows_load_unselected = req.runtime_state->num_rows_load_unselected(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id()); + t.__set_num_finished_range(cast_set(req.runtime_state->num_finished_range())); + t.__set_loaded_rows(req.runtime_state->num_rows_load_total()); + t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); + params.fragment_instance_reports.push_back(t); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || + rs->num_finished_range() > 0) { + params.__isset.load_counters = true; + num_rows_load_success += rs->num_rows_load_success(); + num_rows_load_filtered += rs->num_rows_load_filtered(); + num_rows_load_unselected += rs->num_rows_load_unselected(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(rs->fragment_instance_id()); + t.__set_num_finished_range(cast_set(rs->num_finished_range())); + t.__set_loaded_rows(rs->num_rows_load_total()); + t.__set_loaded_bytes(rs->num_bytes_load_total()); + params.fragment_instance_reports.push_back(t); + } + } + } + params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); + params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); + params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); + + if (!req.load_error_url.empty()) { + params.__set_tracking_url(req.load_error_url); + } + if (!req.first_error_msg.empty()) { + params.__set_first_error_msg(req.first_error_msg); + } + for (auto* rs : req.runtime_states) { + if (rs->wal_id() > 0) { + params.__set_txn_id(rs->wal_id()); + params.__set_label(rs->import_label()); + } + } + if (!req.runtime_state->export_output_files().empty()) { + params.__isset.export_files = true; + params.export_files = req.runtime_state->export_output_files(); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->export_output_files().empty()) { + params.__isset.export_files = true; + params.export_files.insert(params.export_files.end(), + rs->export_output_files().begin(), + rs->export_output_files().end()); + } + } + } + if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) { + params.__isset.commitInfos = true; + params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end()); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) { + params.__isset.commitInfos = true; + params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end()); + } + } + } + if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) { + params.__isset.errorTabletInfos = true; + params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end()); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) { + params.__isset.errorTabletInfos = true; + params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(), + rs_eti.end()); + } + } + } + if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) { + params.__isset.hive_partition_updates = true; + params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(), + hpu.end()); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) { + params.__isset.hive_partition_updates = true; + params.hive_partition_updates.insert(params.hive_partition_updates.end(), + rs_hpu.begin(), rs_hpu.end()); + } + } + } + if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) { + params.__isset.iceberg_commit_datas = true; + params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(), + icd.end()); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) { + params.__isset.iceberg_commit_datas = true; + params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), + rs_icd.begin(), rs_icd.end()); + } + } + } + + if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) { + params.__isset.mc_commit_datas = true; + params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end()); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) { + params.__isset.mc_commit_datas = true; + params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(), + rs_mcd.end()); + } + } + } + + req.runtime_state->get_unreported_errors(&(params.error_log)); + params.__isset.error_log = (!params.error_log.empty()); + + if (_exec_env->cluster_info()->backend_id != 0) { + params.__set_backend_id(_exec_env->cluster_info()->backend_id); + } + + TReportExecStatusResult res; + Status rpc_status; + + VLOG_DEBUG << "reportExecStatus params is " + << apache::thrift::ThriftDebugString(params).c_str(); + if (!exec_status.ok()) { + LOG(WARNING) << "report error status: " << exec_status.msg() + << " to coordinator: " << req.coord_addr + << ", query id: " << print_id(req.query_id); + } + try { + try { + (*coord)->reportExecStatus(res, params); + } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) { +#ifndef ADDRESS_SANITIZER + LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id) + << ", instance id: " << print_id(req.fragment_instance_id) << " to " + << req.coord_addr << ", err: " << e.what(); +#endif + rpc_status = coord->reopen(); + + if (!rpc_status.ok()) { + req.cancel_fn(rpc_status); + return; + } + (*coord)->reportExecStatus(res, params); + } + + rpc_status = Status::create(res.status); + } catch (apache::thrift::TException& e) { + rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}", + PrintThriftNetworkAddress(req.coord_addr), e.what()); + } + + if (!rpc_status.ok()) { + LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}", + print_id(req.query_id), rpc_status.to_string()); + req.cancel_fn(rpc_status); + } +} + Status PipelineFragmentContext::send_report(bool done) { Status exec_status = _query_ctx->exec_status(); // If plan is done successfully, but _is_report_success is false, @@ -2009,9 +2264,14 @@ Status PipelineFragmentContext::send_report(bool done) { .load_error_url = load_eror_url, .first_error_msg = first_error_msg, .cancel_fn = [this](const Status& reason) { cancel(reason); }}; - - return _report_status_cb( - req, std::dynamic_pointer_cast(shared_from_this())); + auto ctx = std::dynamic_pointer_cast(shared_from_this()); + return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() { + SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker()); + _coordinator_callback(req); + if (!req.done) { + ctx->refresh_next_report_time(); + } + }); } size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const { diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h b/be/src/exec/pipeline/pipeline_fragment_context.h index 6372737c0e7794..01306799aa451b 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.h +++ b/be/src/exec/pipeline/pipeline_fragment_context.h @@ -52,18 +52,9 @@ class Dependency; class PipelineFragmentContext : public TaskExecutionContext { public: ENABLE_FACTORY_CREATOR(PipelineFragmentContext); - // Callback to report execution status of plan fragment. - // 'profile' is the cumulative profile, 'done' indicates whether the execution - // is done or still continuing. - // Note: this does not take a const RuntimeProfile&, because it might need to call - // functions like PrettyPrint() or to_thrift(), neither of which is const - // because they take locks. - using report_status_callback = std::function&&)>; PipelineFragmentContext(TUniqueId query_id, const TPipelineFragmentParams& request, std::shared_ptr query_ctx, ExecEnv* exec_env, - const std::function& call_back, - report_status_callback report_status_cb); + const std::function& call_back); ~PipelineFragmentContext() override; @@ -157,6 +148,9 @@ class PipelineFragmentContext : public TaskExecutionContext { } private: + void _coordinator_callback(const ReportStatusRequest& req); + std::string _to_http_path(const std::string& file_name) const; + void _release_resource(); Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool); @@ -257,14 +251,6 @@ class PipelineFragmentContext : public TaskExecutionContext { std::atomic_bool _disable_period_report = true; std::atomic_uint64_t _previous_report_time = 0; - // This callback is used to notify the FE of the status of the fragment. - // For example: - // 1. when the fragment is cancelled, it will be called. - // 2. when the fragment is finished, it will be called. especially, when the fragment is - // a insert into select statement, it should notfiy FE every fragment's status. - // And also, this callback is called periodly to notify FE the load process. - report_status_callback _report_status_cb; - DescriptorTbl* _desc_tbl = nullptr; int _num_instances = 1; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 02245d627b2958..9fbc74281ce751 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -127,9 +127,6 @@ std::string to_load_error_http_path(const std::string& file_name) { return url.str(); } -using apache::thrift::TException; -using apache::thrift::transport::TTransportException; - static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info, std::unordered_set& query_set) { TFetchRunningQueriesResult rpc_result; @@ -349,276 +346,6 @@ void FragmentMgr::stop() { } } -std::string FragmentMgr::to_http_path(const std::string& file_name) { - std::stringstream url; - url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port - << "/api/_download_load?" - << "token=" << _exec_env->token() << "&file=" << file_name; - return url.str(); -} - -Status FragmentMgr::trigger_pipeline_context_report( - const ReportStatusRequest req, std::shared_ptr&& ctx) { - return _thread_pool->submit_func([this, req, ctx]() { - SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker()); - coordinator_callback(req); - if (!req.done) { - ctx->refresh_next_report_time(); - } - }); -} - -// There can only be one of these callbacks in-flight at any moment, because -// it is only invoked from the executor's reporting thread. -// Also, the reported status will always reflect the most recent execution status, -// including the final status when execution finishes. -void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { - DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", { - int random_seconds = req.status.is() ? 8 : 2; - LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id)); - std::this_thread::sleep_for(std::chrono::seconds(random_seconds)); - LOG_INFO("sleep done").tag("query_id", print_id(req.query_id)); - }); - - DCHECK(req.status.ok() || req.done); // if !status.ok() => done - if (req.coord_addr.hostname == "external") { - // External query (flink/spark read tablets) not need to report to FE. - return; - } - int callback_retries = 10; - const int sleep_ms = 1000; - Status exec_status = req.status; - Status coord_status; - std::unique_ptr coord = nullptr; - do { - coord = std::make_unique(_exec_env->frontend_client_cache(), - req.coord_addr, &coord_status); - if (!coord_status.ok()) { - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); - } - } while (!coord_status.ok() && callback_retries-- > 0); - - if (!coord_status.ok()) { - std::stringstream ss; - UniqueId uid(req.query_id.hi, req.query_id.lo); - static_cast(req.cancel_fn(Status::InternalError( - "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(), - PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string()))); - return; - } - - TReportExecStatusParams params; - params.protocol_version = FrontendServiceVersion::V1; - params.__set_query_id(req.query_id); - params.__set_backend_num(req.backend_num); - params.__set_fragment_instance_id(req.fragment_instance_id); - params.__set_fragment_id(req.fragment_id); - params.__set_status(exec_status.to_thrift()); - params.__set_done(req.done); - params.__set_query_type(req.runtime_state->query_type()); - params.__isset.profile = false; - - DCHECK(req.runtime_state != nullptr); - - if (req.runtime_state->query_type() == TQueryType::LOAD) { - params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); - params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); - } else { - DCHECK(!req.runtime_states.empty()); - if (!req.runtime_state->output_files().empty()) { - params.__isset.delta_urls = true; - for (auto& it : req.runtime_state->output_files()) { - params.delta_urls.push_back(to_http_path(it)); - } - } - if (!params.delta_urls.empty()) { - params.__isset.delta_urls = true; - } - } - - // load rows - static std::string s_dpp_normal_all = "dpp.norm.ALL"; - static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; - static std::string s_unselected_rows = "unselected.rows"; - int64_t num_rows_load_success = 0; - int64_t num_rows_load_filtered = 0; - int64_t num_rows_load_unselected = 0; - if (req.runtime_state->num_rows_load_total() > 0 || - req.runtime_state->num_rows_load_filtered() > 0 || - req.runtime_state->num_finished_range() > 0) { - params.__isset.load_counters = true; - - num_rows_load_success = req.runtime_state->num_rows_load_success(); - num_rows_load_filtered = req.runtime_state->num_rows_load_filtered(); - num_rows_load_unselected = req.runtime_state->num_rows_load_unselected(); - params.__isset.fragment_instance_reports = true; - TFragmentInstanceReport t; - t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id()); - t.__set_num_finished_range(cast_set(req.runtime_state->num_finished_range())); - t.__set_loaded_rows(req.runtime_state->num_rows_load_total()); - t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); - params.fragment_instance_reports.push_back(t); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || - rs->num_finished_range() > 0) { - params.__isset.load_counters = true; - num_rows_load_success += rs->num_rows_load_success(); - num_rows_load_filtered += rs->num_rows_load_filtered(); - num_rows_load_unselected += rs->num_rows_load_unselected(); - params.__isset.fragment_instance_reports = true; - TFragmentInstanceReport t; - t.__set_fragment_instance_id(rs->fragment_instance_id()); - t.__set_num_finished_range(cast_set(rs->num_finished_range())); - t.__set_loaded_rows(rs->num_rows_load_total()); - t.__set_loaded_bytes(rs->num_bytes_load_total()); - params.fragment_instance_reports.push_back(t); - } - } - } - params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); - params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); - params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); - - if (!req.load_error_url.empty()) { - params.__set_tracking_url(req.load_error_url); - } - if (!req.first_error_msg.empty()) { - params.__set_first_error_msg(req.first_error_msg); - } - for (auto* rs : req.runtime_states) { - if (rs->wal_id() > 0) { - params.__set_txn_id(rs->wal_id()); - params.__set_label(rs->import_label()); - } - } - if (!req.runtime_state->export_output_files().empty()) { - params.__isset.export_files = true; - params.export_files = req.runtime_state->export_output_files(); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (!rs->export_output_files().empty()) { - params.__isset.export_files = true; - params.export_files.insert(params.export_files.end(), - rs->export_output_files().begin(), - rs->export_output_files().end()); - } - } - } - if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) { - params.__isset.commitInfos = true; - params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end()); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) { - params.__isset.commitInfos = true; - params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end()); - } - } - } - if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) { - params.__isset.errorTabletInfos = true; - params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end()); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) { - params.__isset.errorTabletInfos = true; - params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(), - rs_eti.end()); - } - } - } - if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) { - params.__isset.hive_partition_updates = true; - params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(), - hpu.end()); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) { - params.__isset.hive_partition_updates = true; - params.hive_partition_updates.insert(params.hive_partition_updates.end(), - rs_hpu.begin(), rs_hpu.end()); - } - } - } - if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) { - params.__isset.iceberg_commit_datas = true; - params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(), - icd.end()); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) { - params.__isset.iceberg_commit_datas = true; - params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), - rs_icd.begin(), rs_icd.end()); - } - } - } - - if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) { - params.__isset.mc_commit_datas = true; - params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end()); - } else if (!req.runtime_states.empty()) { - for (auto* rs : req.runtime_states) { - if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) { - params.__isset.mc_commit_datas = true; - params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(), - rs_mcd.end()); - } - } - } - - // Send new errors to coordinator - req.runtime_state->get_unreported_errors(&(params.error_log)); - params.__isset.error_log = (!params.error_log.empty()); - - if (_exec_env->cluster_info()->backend_id != 0) { - params.__set_backend_id(_exec_env->cluster_info()->backend_id); - } - - TReportExecStatusResult res; - Status rpc_status; - - VLOG_DEBUG << "reportExecStatus params is " - << apache::thrift::ThriftDebugString(params).c_str(); - if (!exec_status.ok()) { - LOG(WARNING) << "report error status: " << exec_status.msg() - << " to coordinator: " << req.coord_addr - << ", query id: " << print_id(req.query_id); - } - try { - try { - (*coord)->reportExecStatus(res, params); - } catch ([[maybe_unused]] TTransportException& e) { -#ifndef ADDRESS_SANITIZER - LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id) - << ", instance id: " << print_id(req.fragment_instance_id) << " to " - << req.coord_addr << ", err: " << e.what(); -#endif - rpc_status = coord->reopen(); - - if (!rpc_status.ok()) { - // we need to cancel the execution of this fragment - req.cancel_fn(rpc_status); - return; - } - (*coord)->reportExecStatus(res, params); - } - - rpc_status = Status::create(res.status); - } catch (TException& e) { - rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}", - PrintThriftNetworkAddress(req.coord_addr), e.what()); - } - - if (!rpc_status.ok()) { - LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}", - print_id(req.query_id), rpc_status.to_string()); - // we need to cancel the execution of this fragment - req.cancel_fn(rpc_status); - } -} - static void empty_function(RuntimeState*, Status*) {} Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, @@ -906,10 +633,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, params.query_options.single_backend_query); int64_t duration_ns = 0; std::shared_ptr context = std::make_shared( - query_ctx->query_id(), params, query_ctx, _exec_env, cb, - [this](const ReportStatusRequest& req, auto&& ctx) { - return this->trigger_pipeline_context_report(req, std::move(ctx)); - }); + query_ctx->query_id(), params, query_ctx, _exec_env, cb); { SCOPED_RAW_TIMER(&duration_ns); Status prepare_st = Status::OK(); @@ -1583,10 +1307,7 @@ Status FragmentMgr::rerun_fragment(const std::shared_ptr& gu } auto context = std::make_shared( - q_ctx->query_id(), info.params, q_ctx, _exec_env, info.finish_callback, - [this](const ReportStatusRequest& req, auto&& ctx) { - return this->trigger_pipeline_context_report(req, std::move(ctx)); - }); + q_ctx->query_id(), info.params, q_ctx, _exec_env, info.finish_callback); // Propagate the recursion stage so that runtime filters created by this PFC // carry the correct stage number. context->set_rec_cte_stage(info.stage); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 7444317d30495e..ab78c18555a640 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -133,9 +133,6 @@ class FragmentMgr : public RestMonitorIface { Status start_query_execution(const PExecPlanFragmentStartRequest* request); - Status trigger_pipeline_context_report(const ReportStatusRequest, - std::shared_ptr&&); - // Can be used in both version. MOCK_FUNCTION void cancel_query(const TUniqueId query_id, const Status reason); @@ -162,10 +159,6 @@ class FragmentMgr : public RestMonitorIface { Status sync_filter_size(const PSyncFilterSizeRequest* request); - std::string to_http_path(const std::string& file_name); - - void coordinator_callback(const ReportStatusRequest& req); - ThreadPool* get_thread_pool() { return _thread_pool.get(); } // When fragment mgr is going to stop, the _stop_background_threads_latch is set to 0 From 7673aa9a7c10cec9c13c2cce9f3a062a588da46c Mon Sep 17 00:00:00 2001 From: yiguolei Date: Wed, 15 Apr 2026 17:35:50 +0800 Subject: [PATCH 2/4] f --- be/test/exec/pipeline/pipeline_task_test.cpp | 5 +---- be/test/exec/pipeline/pipeline_test.cpp | 5 +---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index ad15586f24eb5c..84ecb51ac0fba8 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -71,10 +71,7 @@ class PipelineTaskTest : public testing::Test { int fragment_id = 0; _context = std::make_shared( _query_id, TPipelineFragmentParams(), _query_ctx, ExecEnv::GetInstance(), - empty_function, - std::bind(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), - ExecEnv::GetInstance()->fragment_mgr(), std::placeholders::_1, - std::placeholders::_2)); + empty_function); _runtime_state = std::make_unique( _query_id, fragment_id, _query_options, _query_ctx->query_globals, ExecEnv::GetInstance(), _query_ctx.get()); diff --git a/be/test/exec/pipeline/pipeline_test.cpp b/be/test/exec/pipeline/pipeline_test.cpp index 083679406b6038..4150e281a3539c 100644 --- a/be/test/exec/pipeline/pipeline_test.cpp +++ b/be/test/exec/pipeline/pipeline_test.cpp @@ -88,10 +88,7 @@ class PipelineTest : public testing::Test { int fragment_id = _next_fragment_id(); _context.push_back(std::make_shared( _query_id, TPipelineFragmentParams(), _query_ctx, ExecEnv::GetInstance(), - empty_function, - std::bind(std::mem_fn(&FragmentMgr::trigger_pipeline_context_report), - ExecEnv::GetInstance()->fragment_mgr(), std::placeholders::_1, - std::placeholders::_2))); + empty_function)); _runtime_state.push_back(RuntimeState::create_unique( _query_id, fragment_id, _query_options, _query_ctx->query_globals, ExecEnv::GetInstance(), _query_ctx.get())); From 7c30d672a8c32080f9fa08631f63c4d45141b8dc Mon Sep 17 00:00:00 2001 From: yiguolei Date: Wed, 15 Apr 2026 18:31:15 +0800 Subject: [PATCH 3/4] f --- be/test/exec/pipeline/pipeline_task_test.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index 84ecb51ac0fba8..34d007be4c24a5 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -69,9 +69,9 @@ class PipelineTaskTest : public testing::Test { private: void _build_fragment_context() { int fragment_id = 0; - _context = std::make_shared( - _query_id, TPipelineFragmentParams(), _query_ctx, ExecEnv::GetInstance(), - empty_function); + _context = std::make_shared(_query_id, TPipelineFragmentParams(), + _query_ctx, ExecEnv::GetInstance(), + empty_function); _runtime_state = std::make_unique( _query_id, fragment_id, _query_options, _query_ctx->query_globals, ExecEnv::GetInstance(), _query_ctx.get()); From b72284c18bff1cd7ae2cba885c988e357163e3a0 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 16 Apr 2026 09:26:20 +0800 Subject: [PATCH 4/4] f --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index a068b3d8048b71..efb74e9b8177cd 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -29,6 +29,7 @@ // IWYU pragma: no_include #include #include +#include #include #include // IWYU pragma: keep