Skip to content

Commit

Permalink
[BugFix] Fix exec state report lost lead to ingestion status getting …
Browse files Browse the repository at this point in the history
…stuck (#36688)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
  • Loading branch information
meegoo committed Jan 5, 2024
1 parent 4a82571 commit d088b3f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
17 changes: 15 additions & 2 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Expand Up @@ -302,10 +302,23 @@ ExecStateReporter::ExecStateReporter() {
if (!status.ok()) {
LOG(FATAL) << "Cannot create thread pool for ExecStateReport: error=" << status.to_string();
}

status = ThreadPoolBuilder("priority_ex_state_report") // priority exec state reporter with infinite queue
.set_min_threads(1)
.set_max_threads(2)
.set_idle_timeout(MonoDelta::FromMilliseconds(2000))
.build(&_priority_thread_pool);
if (!status.ok()) {
LOG(FATAL) << "Cannot create thread pool for priority ExecStateReport: error=" << status.to_string();
}
}

void ExecStateReporter::submit(std::function<void()>&& report_task) {
(void)_thread_pool->submit_func(std::move(report_task));
void ExecStateReporter::submit(std::function<void()>&& report_task, bool priority) {
if (priority) {
(void)_priority_thread_pool->submit_func(std::move(report_task));
} else {
(void)_thread_pool->submit_func(std::move(report_task));
}
}

} // namespace starrocks::pipeline
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/exec_state_reporter.h
Expand Up @@ -38,7 +38,7 @@ class ExecStateReporter {
static Status report_exec_status(const TReportExecStatusParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr);

void submit(std::function<void()>&& report_task);
void submit(std::function<void()>&& report_task, bool priority = false);

// STREAM MV
static TMVMaintenanceTasks create_report_epoch_params(const QueryContext* query_ctx,
Expand All @@ -48,5 +48,6 @@ class ExecStateReporter {

private:
std::unique_ptr<ThreadPool> _thread_pool;
std::unique_ptr<ThreadPool> _priority_thread_pool;
};
} // namespace starrocks::pipeline
34 changes: 24 additions & 10 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Expand Up @@ -320,21 +320,35 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
auto fragment_id = fragment_ctx->fragment_instance_id();

auto report_task = [=]() {
auto status = ExecStateReporter::report_exec_status(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id="
<< print_id(fragment_id);
int retry_times = 0;
while (retry_times++ < 3) {
auto status = ExecStateReporter::report_exec_status(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string() << ", retry_times=" << retry_times;
// if it is done exec state report, we should retry
if (params.__isset.done && params.done) {
continue;
}
}
} else {
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string();
LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << params.done;
}
} else {
LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id);
break;
}
};

this->_exec_state_reporter->submit(std::move(report_task));
// if it is done exec state report, We need to ensure that this report is executed with priority
// and is retried as much as possible to ensure success.
// Otherwise, it may result in the query or ingestion status getting stuck.
this->_exec_state_reporter->submit(std::move(report_task), done);
VLOG(1) << "[Driver] Submit exec state report task: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << done;
}

void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx, bool* done) {
Expand Down

0 comments on commit d088b3f

Please sign in to comment.