diff --git a/be/src/exec/pipeline/exec_state_reporter.cpp b/be/src/exec/pipeline/exec_state_reporter.cpp index d5d88c25a205eb..63c3250fc22683 100644 --- a/be/src/exec/pipeline/exec_state_reporter.cpp +++ b/be/src/exec/pipeline/exec_state_reporter.cpp @@ -297,11 +297,27 @@ ExecStateReporter::ExecStateReporter() { if (!status.ok()) { LOG(FATAL) << "Cannot create thread pool for ExecStateReport: error=" << status.to_string(); } + + status = ThreadPoolBuilder("priority_ex_state_report") // priority exec state reporter with infinite queue + .set_min_threads(1) + .set_max_threads(2) + .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) + .build(&_priority_thread_pool); + if (!status.ok()) { + LOG(FATAL) << "Cannot create thread pool for priority ExecStateReport: error=" << status.to_string(); + } } -void ExecStateReporter::submit(std::function&& report_task) { - auto st = _thread_pool->submit_func(std::move(report_task)); - st.permit_unchecked_error(); +void ExecStateReporter::submit(std::function&& report_task, bool priority) { + if (priority) { + auto st = _priority_thread_pool->submit_func(std::move(report_task)); + if (!st.ok()) { + LOG(WARNING) << "Failed to submit report task: " << st.get_error_msg(); + } + } else { + auto st = _thread_pool->submit_func(std::move(report_task)); + st.permit_unchecked_error(); + } } } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/exec_state_reporter.h b/be/src/exec/pipeline/exec_state_reporter.h index 3060eefd04b933..fe718e52d78e85 100644 --- a/be/src/exec/pipeline/exec_state_reporter.h +++ b/be/src/exec/pipeline/exec_state_reporter.h @@ -38,7 +38,7 @@ class ExecStateReporter { static Status report_exec_status(const TReportExecStatusParams& params, ExecEnv* exec_env, const TNetworkAddress& fe_addr); - void submit(std::function&& report_task); + void submit(std::function&& report_task, bool priority = false); // STREAM MV static TMVMaintenanceTasks create_report_epoch_params(const QueryContext* query_ctx, @@ -48,5 +48,6 @@ class ExecStateReporter { private: std::unique_ptr _thread_pool; + std::unique_ptr _priority_thread_pool; }; } // namespace starrocks::pipeline diff --git a/be/src/exec/pipeline/pipeline_driver_executor.cpp b/be/src/exec/pipeline/pipeline_driver_executor.cpp index a255a2d74f885b..fafafb4eb40834 100644 --- a/be/src/exec/pipeline/pipeline_driver_executor.cpp +++ b/be/src/exec/pipeline/pipeline_driver_executor.cpp @@ -320,21 +320,37 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo auto fragment_id = fragment_ctx->fragment_instance_id(); auto report_task = [=]() { - auto status = ExecStateReporter::report_exec_status(params, exec_env, fe_addr); - if (!status.ok()) { - if (status.is_not_found()) { - LOG(INFO) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id=" - << print_id(fragment_id); + int retry_times = 0; + bool should_retry = false; // Add a flag to determine if we need to retry. + do { + auto status = ExecStateReporter::report_exec_status(params, exec_env, fe_addr); + if (!status.ok()) { + if (status.is_not_found()) { + LOG(INFO) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id=" + << print_id(fragment_id); + } else { + LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id) + << ", status: " << status.to_string() << ", retry_times=" << retry_times; + // if it is done exec state report, we should retry + if (params.__isset.done && params.done) { + should_retry = true; // Use the flag to indicate the need for a retry. + continue; + } + } } else { - LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id) - << ", status: " << status.to_string(); + LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id) + << ", is_done=" << params.done; } - } else { - LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id); - } + break; + } while (should_retry && retry_times < 3); // Check the condition at the end of the loop. }; - this->_exec_state_reporter->submit(std::move(report_task)); + // if it is done exec state report, We need to ensure that this report is executed with priority + // and is retried as much as possible to ensure success. + // Otherwise, it may result in the query or ingestion status getting stuck. + this->_exec_state_reporter->submit(std::move(report_task), done); + VLOG(1) << "[Driver] Submit exec state report task: fragment_instance_id=" << print_id(fragment_id) + << ", is_done=" << done; } void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {