[fix](executor) prevent BE crash when split process throws unexpectedly#62044
[fix](executor) prevent BE crash when split process throws unexpectedly#62044eldenmoon wants to merge 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
Prevents backend worker-thread termination (and potential BE crash) when split->process() throws unexpectedly in the time-sharing scan executor by converting thrown exceptions into split failure statuses.
Changes:
- Wrap
PrioritizedSplitRunner::process()invocation in a try/catch to prevent exceptions from escaping the dispatch thread. - Map
doris::Exception(including special-casingMEM_ALLOC_FAILED) and other exceptions to appropriateStatuserrors returned viaResult. - Keep
enable_thread_catch_bad_allocscoped aroundsplit->process()to preserve existing memory-exception behavior.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
Outdated
Show resolved
Hide resolved
| auto blocked_future_result = [&]() -> Result<SharedListenableFuture<Void>> { | ||
| try { | ||
| doris::enable_thread_catch_bad_alloc++; | ||
| Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; | ||
| return split->process(); | ||
| } catch (const doris::Exception& e) { | ||
| if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { | ||
| return unexpected(Status::MemoryLimitExceeded( | ||
| "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, " | ||
| "__FUNCTION__:{}", | ||
| e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); | ||
| } | ||
| return unexpected(e.to_status()); | ||
| } catch (const std::exception& e) { |
There was a problem hiding this comment.
This try/catch block duplicates the exception-to-Status mapping logic already implemented in common/exception.h (including the enable_thread_catch_bad_alloc guard and the "PreCatch" message). To avoid future divergence, consider factoring the conversion into a shared helper (e.g., a function that converts an Exception/std::exception to Status) and reuse it here for the Result<...> error path.
Catch exceptions around split->process() in TimeSharingTaskExecutor and convert them to split failure status. This avoids worker thread termination and BE crash for cases : ``` erminate called after throwing an instance of 'doris::Exception' what(): [E-7412] assert cast err:[E-7412] Bad cast from ... doris::vectorized::ScannerSplitRunner::process_for(std::chrono::duration<long, std::ratio<1l, 1000000000l> >) at /home/zcp/repo_center/doris_release/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:420 10# doris::vectorized::PrioritizedSplitRunner::process() at /home/zcp/repo_center/doris_release/doris/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.cpp:104 11# doris::vectorized::TimeSharingTaskExecutor::_dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp:568 12# ``` w , while keeping MEM_ALLOC_FAILED mapped to MemoryLimitExceeded.
c1429c7 to
a67425a
Compare
…executor.cpp Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
/review |
|
run buildall |
TPC-H: Total hot run time: 29125 ms |
TPC-DS: Total hot run time: 179999 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
skip check_coverage |
|
/review |
There was a problem hiding this comment.
Found 1 blocking issue.
- [high]
TimeSharingTaskExecutornow swallows exceptions fromsplit->process()and routes them to_split_finished(split, status), but for scan splits that bypasses the existingScannerSchedulererror path that records the failure onScanTaskand pushes it back intoScannerContext.ScannerContext::submit_scan_task()increments_in_flight_tasks_num, and that counter is decremented only inScannerContext::push_back_scan_task().ScannerSplitRunner::close()is empty, so when_scanner_scan()(or anything inside_scan_func) throws, the worker no longer crashes, but the scan task is never pushed back, the error never reaches the consumer, and the context can hang waiting for an in-flight task that will never return.
Critical checkpoint conclusions:
- Goal of the task: Partially accomplished. The patch prevents worker-thread termination, but the scanner failure path is no longer completed correctly, and the added test does not cover the affected scan path.
- Modification size/focus: Small and focused.
- Concurrency: No new lock-order issue found in the catch block itself, but split/task lifecycle accounting becomes inconsistent on the exception path.
- Lifecycle management: Failed for
ScannerSplitRunneron the new path because the task never re-entersScannerContext::push_back_scan_task(). - Configuration: Not applicable.
- Compatibility: Not applicable.
- Parallel code paths: Problematic. Generic
ThrowingSplitRunnerin the new test behaves differently fromScannerSplitRunner, whoseclose()is empty and whose failure propagation relies on the scheduler path. - Special conditional checks: Catch ordering is correct (
doris::Exception->std::exception->...). - Test coverage: Insufficient. The new test covers generic exception-to-status conversion only; it does not cover scanner splits or the promised
MEM_ALLOC_FAILEDremapping branch. - Observability: Warning log exists, but the real scanner error is no longer surfaced to the consumer on this path.
- Transaction/persistence/data writes/FE-BE variable passing: Not applicable.
- Performance: No material issue found in this small change.
- Other issues: None beyond the blocking lifecycle/error-propagation issue above.
| } | ||
|
|
||
| Result<SharedListenableFuture<Void>> blocked_future_result = split->process(); | ||
| auto blocked_future_result = [&]() -> Result<SharedListenableFuture<Void>> { |
There was a problem hiding this comment.
Catching split->process() here avoids the crash, but it also changes the scanner control flow in a way that loses the failure signal. For ScannerSplitRunner, the normal error path is inside ScannerScheduler::submit(): the work_func lambda catches _scanner_scan() failures, calls scanner_ref->set_status(status), and then ctx->push_back_scan_task(scanner_ref). That push-back is what decrements ScannerContext::_in_flight_tasks_num and wakes the consumer.
On this new path we skip that entire layer and go straight to _split_finished(split, status). ScannerSplitRunner::close() is empty, so nothing completes its _completion_future, nothing pushes the ScanTask back into the context, and the consumer never sees the error. In practice this can trade the BE crash for a hung scan, because submit_scan_task() increments _in_flight_tasks_num and push_back_scan_task() is the only place that decrements it.
The new unit test does not catch this because ThrowingSplitRunner::close() marks itself finished, which is not how ScannerSplitRunner behaves.
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
kaka11chen
left a comment
There was a problem hiding this comment.
Why wasn't this exception caught here?
auto sumbit_task = [&]() {
auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
**RETURN_IF_CATCH_EXCEPTION**(_scanner_scan(ctx, scanner_ref));
return Status::OK();
}();
if (!status.ok()) {
scanner_ref->set_status(status);
ctx->push_back_scan_task(scanner_ref);
return true;
}
return scanner_ref->is_eos();
};
SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
return this->submit_scan_task(simple_scan_task);
};
Catch exceptions around split->process() in TimeSharingTaskExecutor and
convert them to split failure status.
This avoids worker thread termination and BE crash for cases :
, while keeping MEM_ALLOC_FAILED mapped to
MemoryLimitExceeded.