Skip to content

Commit

Permalink
[BugFix] Fix exec state report lost lead to ingestion status getting …
Browse files Browse the repository at this point in the history
…stuck

Signed-off-by: meegoo <meegoo.sr@gmail.com>
  • Loading branch information
meegoo committed Dec 8, 2023
1 parent 0f22dff commit fd2f503
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
22 changes: 19 additions & 3 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Expand Up @@ -297,11 +297,27 @@ ExecStateReporter::ExecStateReporter() {
if (!status.ok()) {
LOG(FATAL) << "Cannot create thread pool for ExecStateReport: error=" << status.to_string();
}

status = ThreadPoolBuilder("priority_ex_state_report") // priority exec state reporter with infinite queue
.set_min_threads(1)
.set_max_threads(2)
.set_idle_timeout(MonoDelta::FromMilliseconds(2000))
.build(&_priority_thread_pool);
if (!status.ok()) {
LOG(FATAL) << "Cannot create thread pool for priority ExecStateReport: error=" << status.to_string();
}
}

void ExecStateReporter::submit(std::function<void()>&& report_task) {
auto st = _thread_pool->submit_func(std::move(report_task));
st.permit_unchecked_error();
void ExecStateReporter::submit(std::function<void()>&& report_task, bool priority) {
if (priority) {
auto st = _priority_thread_pool->submit_func(std::move(report_task));
if (!st.ok()) {
LOG(WARNING) << "Failed to submit report task: " << st.get_error_msg();
}
} else {
auto st = _thread_pool->submit_func(std::move(report_task));
st.permit_unchecked_error();
}
}

} // namespace starrocks::pipeline
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/exec_state_reporter.h
Expand Up @@ -38,7 +38,7 @@ class ExecStateReporter {
static Status report_exec_status(const TReportExecStatusParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr);

void submit(std::function<void()>&& report_task);
void submit(std::function<void()>&& report_task, bool priority = false);

// STREAM MV
static TMVMaintenanceTasks create_report_epoch_params(const QueryContext* query_ctx,
Expand All @@ -48,5 +48,6 @@ class ExecStateReporter {

private:
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<ThreadPool> _priority_thread_pool;
};
} // namespace starrocks::pipeline
38 changes: 27 additions & 11 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Expand Up @@ -320,21 +320,37 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
auto fragment_id = fragment_ctx->fragment_instance_id();

auto report_task = [=]() {
auto status = ExecStateReporter::report_exec_status(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id="
<< print_id(fragment_id);
int retry_times = 0;
bool should_retry = false; // Add a flag to determine if we need to retry.
do {
auto status = ExecStateReporter::report_exec_status(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string() << ", retry_times=" << retry_times;
// if it is done exec state report, we should retry
if (params.__isset.done && params.done) {
should_retry = true; // Use the flag to indicate the need for a retry.
continue;
}
}
} else {
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string();
LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << params.done;
}
} else {
LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id);
}
break;
} while (should_retry && retry_times < 3); // Check the condition at the end of the loop.
};

this->_exec_state_reporter->submit(std::move(report_task));
// if it is done exec state report, We need to ensure that this report is executed with priority
// and is retried as much as possible to ensure success.
// Otherwise, it may result in the query or ingestion status getting stuck.
this->_exec_state_reporter->submit(std::move(report_task), done);
VLOG(1) << "[Driver] Submit exec state report task: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << done;
}

void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {
Expand Down

0 comments on commit fd2f503

Please sign in to comment.