[Chore](pipeline) make wake up do not return error#62190
[Chore](pipeline) make wake up do not return error#62190BiteTheDDDDt merged 3 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR hardens the pipeline dependency wake-up path by making Dependency::set_ready() non-throwing and adding exception handling around task wake-up to avoid leaving tasks stuck in BLOCKED state.
Changes:
- Mark
Dependency::set_ready()asnoexcept. - Add try/catch around
PipelineTask::wake_up()to cancel the query if wake-up fails for a non-finalized task. - Handle both Doris exceptions and standard/unexpected exceptions during wake-up.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| be/src/exec/pipeline/dependency.h | Marks Dependency::set_ready() as noexcept. |
| be/src/exec/pipeline/dependency.cpp | Wraps task wake-up logic with exception handling and best-effort recovery via query cancellation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Found 1 issue.
be/src/exec/pipeline/dependency.cpp: the new recovery path treats onlyFINALIZEDtasks as benign, butPipelineTask::close()moves tasks toFINISHEDbeforefinalize()runs. A delayed dependency wake-up in that window can returnTask state transition from FINISHED to RUNNABLE is not allowed, and this patch now escalates that benign late notification intofrag->cancel(...), turning task completion races into query failures.
Critical checkpoint conclusions:
- Goal / proof: The goal is to make
Dependency::set_ready()non-throwing and avoid hangs. The current code does not fully accomplish that safely because it can incorrectly cancel a completing query. No new test covers the delayed wake-up / completion race. - Small / focused change: Yes, the patch is small and localized.
- Concurrency: Applicable. The change is in a concurrent wake-up path; lock scope and ordering remain small, but the completion-state classification in the race window is incorrect.
- Lifecycle / static init: No static-init concern. Task lifecycle is central here, and the review finding is in the
FINISHED->FINALIZEDwindow. - Configuration: Not applicable.
- Compatibility / storage format: Not applicable.
- Parallel code paths: No additional analogous path requiring the same change was identified.
- Special conditional checks: The new
t->is_finalized()check is too narrow for the intended “already completed” case. - Test coverage: Missing targeted coverage for delayed wake-up and cancellation/completion races.
- Observability: The added warning logs include the dependency name and status; observability is otherwise adequate.
- Transaction / persistence: Not applicable.
- Data writes / atomicity: Not applicable.
- FE-BE variable passing: Not applicable.
- Performance: No major performance issue identified in this patch.
- Other issues: None beyond the false-positive cancellation above.
| local_block_task.swap(_blocked_task); | ||
| } | ||
| for (auto task : local_block_task) { | ||
| if (auto t = task.lock()) { |
There was a problem hiding this comment.
t->is_finalized() is too narrow for the new "benign late wake-up" check. PipelineTask::close() sets the state to FINISHED before finalize() flips it to FINALIZED (pipeline_task.cpp), so a delayed dependency notification can legitimately hit a task that is already completed but not yet finalized. In that window wake_up() returns Task state transition from FINISHED to RUNNABLE is not allowed, and this code now cancels the whole fragment even though the task has already finished.
Please treat the pre-finalize FINISHED state as completed too, or key off the specific state-transition failure instead of only is_finalized().
|
/review |
|
run buildall |
There was a problem hiding this comment.
Found 1 issue that should be fixed before merge.
Critical checkpoints:
- Goal / correctness: Not satisfied. The new noexcept path still allows blocked tasks to be stranded if an exception aborts the wake-up loop before fragment cancellation.
- Minimality / focus: Yes. The change is small and localized to Dependency::set_ready().
- Concurrency: Risk remains.
_readyis published and_blocked_taskis cleared before every waiter is safely handled, so an exception can break the task lifecycle. - Lifecycle / initialization: No new lifecycle or static initialization issues seen.
- Configuration: N/A.
- Compatibility: N/A.
- Parallel paths: No additional matching path updates appear required.
- Special checks: The new catch blocks do not guarantee cancellation or recovery on the exception path.
- Test coverage: No test covers the wake_up exception path.
- Observability: Non-OK status logging is useful, but the
std::exceptionlog says cancelling query even though this path does not cancel. - Transaction / persistence: N/A.
- Data writes / modifications: N/A.
- FE-BE variable passing: N/A.
- Performance: No primary hot-path regression identified.
- Other issues: None beyond the exception-handling hang risk above.
Overall opinion: not ready to merge as-is.
| } catch (const std::exception& e) { | ||
| // Non-Doris exceptions (e.g. std::bad_alloc from scheduler submit path). | ||
| LOG(WARNING) << "Dependency::set_ready(): unexpected exception during wake_up, " | ||
| "cancelling query. dep=" |
There was a problem hiding this comment.
Once _ready is set and _blocked_task is swapped out, any exception that escapes one iteration aborts the rest of local_block_task. The outer catch then only logs and returns. A concrete case is std::bad_alloc thrown from t->wake_up() or from the logging/cancel code in this loop: the current task is never resubmitted, the dependency is already marked ready, and no later set_ready() call will revisit the dropped waiters. That leaves the fragment hanging instead of cancelling it. The exception handling needs to be per task, with fragment cancellation or equivalent unblock logic before continuing to the next waiter.
|
/review |
|
run buildall |
There was a problem hiding this comment.
Findings:
-
be/src/exec/pipeline/dependency.cpp: the newwake_up()error path leaves the task in an inconsistent lifecycle state.PipelineTask::wake_up()clears_blocked_depand transitions the task toRUNNABLEbefore callingsubmit(). Ifsubmit()returns non-OK here,set_ready()now only cancels the fragment and returns. At that point the task is no longer in_blocked_task, is no longer blocked on the dependency, and was never enqueued, so no worker thread will ever runclose_task()/decrement_running_task()for it. The fragment cancel path only callsunblock_all_dependencies(); it does not schedule or close this stranded task. This is a real hang / leaked-running-task regression on the exact scheduler-failure path the oldTHROW_IF_ERRORpropagated to the caller. -
be/src/exec/pipeline/dependency.cpp: the newstd::exceptioncatch still aborts the BE process viaCHECK(false). That contradicts the PR goal of handlingstd::bad_allocand other standard exceptions gracefully. On the OOM/logging/cancel failure path described in the PR body, this change still turns the query-local failure into a process crash.
Critical checkpoint conclusions:
- Goal of current task: make
Dependency::set_ready()non-throwing and avoid tasks hanging when wake-up/recovery fails. Conclusion: not achieved; the newsubmit()failure handling can strand a task permanently, and thestd::exceptionbranch still aborts the process. No test in this PR demonstrates the new behavior. - Minimal / focused change: mostly focused to
Dependency::set_ready(), but the behavioral change is not safely integrated with the surrounding task lifecycle. - Concurrency: applicable.
Dependency::_blocked_task,PipelineTask::_blocked_dep, and task scheduler queues are part of the wake-up handshake. The new code keeps the lock scope lightweight, but it breaks the state-machine contract whenwake_up()fails after mutating task state. - Lifecycle management: applicable. Task lifecycle is
BLOCKED -> RUNNABLE -> scheduled/closed; the new path can stop afterRUNNABLEwithout ever reaching scheduler ownership or close/finalize. - Configuration items: not applicable.
- Compatibility changes: not applicable.
- Parallel code paths: applicable. Other wake-up/unblock paths (
PipelineFragmentContext::cancel(),Pipeline::make_all_runnable(),PipelineTask::unblock_all_dependencies()) still assume dependency wake-up either succeeds or propagates failure; they do not repair the stranded-task case introduced here. - Special conditional checks: applicable. The new
if (!st.ok())recovery branch needs stronger guarantees or compensation logic, which are currently missing. - Test coverage: insufficient. I did not find a regression test or unit test covering failed
wake_up()/ failed scheduler submission / exception during recovery. - Observability: partial. The warning log includes dependency/task/status, which is good, but observability does not compensate for the stranded-task bug or the process-abort path.
- Transaction / persistence: not applicable.
- Data writes / modifications: not applicable.
- FE/BE variable passing: not applicable.
- Performance: no material concern in the patch itself; correctness/lifecycle issues dominate.
- Other issues: none beyond the two findings above.
Overall opinion: the PR should not be merged as-is because it introduces a task-lifecycle regression on wake-up failure and does not actually provide graceful handling for std::exception / OOM paths.
| if (auto t = task.lock()) { | ||
| std::unique_lock<std::mutex> lc(_task_lock); | ||
| auto st = t->wake_up(this, lc); | ||
| if (!st.ok()) { |
There was a problem hiding this comment.
PipelineTask::wake_up() is not an atomic "enqueue me" operation. It first clears _blocked_dep and transitions the task to RUNNABLE, and only then calls submit() (pipeline_task.cpp:1047-1055). If submit() returns non-OK here, this new branch only cancels the fragment and returns. The task has already been removed from _blocked_task, is no longer blocked on this dependency, and was never inserted into the scheduler queue, so no worker thread will ever call close_task() / decrement_running_task() for it. PipelineFragmentContext::cancel() only calls unblock_all_dependencies(), which does not repair that stranded-task state. This turns a scheduler failure into a potential permanent hang/leaked running-task count.
|
run buildall |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
update update fmt update
|
run buildall |
|
/review |
There was a problem hiding this comment.
Found 1 issue.
be/src/exec/pipeline/pipeline_task.h: thewake_up()signature change was not propagated to every call site.be/test/exec/pipeline/pipeline_task_test.cpp:557still callstask->wake_up(dep.get(), lc).ok(), so this PR does not compile as-is.
Critical checkpoint conclusions:
- Goal of the task: The goal is to make
Dependency::set_ready()stop propagatingStatusfromwake_up(). The implementation is close, but the remaining stale test call means the change set does not currently build, so the goal is not fully accomplished. - Modification size/focus: Yes. The patch is small and focused on the wake-up/error-handling path.
- Concurrency: I reviewed the
Dependency::set_ready() -> PipelineTask::wake_up()path. It remains concurrency-sensitive, but I did not confirm a separate lock-order regression from this diff beyond the build break. - Special lifecycle/static initialization: No special lifecycle or static initialization issue is introduced in the reviewed code.
- Configuration changes: None.
- Compatibility/storage format changes: None.
- Parallel code paths: The production caller in
Dependency::set_ready()was updated, but the unit test still uses the old return contract, so not all relevant paths were updated. - Special conditional checks: The
FINISHED/FINALIZEDguard remains justified by the existing comment and matches the intended no-resubmit behavior. - Test coverage: Incomplete. The updated signature currently breaks an existing unit test call site, so there is no evidence of passing coverage for this refactor.
- Observability: No additional logging or metrics appear necessary for this narrow refactor.
- Transaction/persistence/data-write concerns: Not applicable.
- FE-BE variable passing: Not applicable.
- Performance: No obvious steady-state performance regression is visible in the reviewed diff.
- Other issues: None beyond the compile break above.
| } | ||
|
|
||
| Status wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* dep_lock */); | ||
| void wake_up(Dependency* dep, std::unique_lock<std::mutex>& /* dep_lock */); |
There was a problem hiding this comment.
This signature change is not fully propagated yet. be/test/exec/pipeline/pipeline_task_test.cpp:557 still does EXPECT_TRUE(task->wake_up(dep.get(), lc).ok());, so the PR no longer compiles because wake_up() now returns void. Please update that remaining test call site together with the two lines already changed above.
|
run buildall |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
This pull request refactors the
PipelineTask::wake_upmethod to improve error handling and simplify its interface. The method now returnsvoidinstead ofStatus, and errors are handled internally by canceling the associated fragment if necessary. Corresponding updates are made throughout the codebase and tests to accommodate this change.Core Refactoring and Error Handling:
PipelineTask::wake_upfrom returningStatustovoid, and updated its implementation to handle errors by canceling the fragment context internally, rather than propagatingStatusup the call stack. [1] [2]Dependency::set_readymethod to call the newwake_upsignature without error checking, since errors are now handled insidewake_up.Test Adjustments:
pipeline_task_test.cppto remove checks on the return value ofwake_upand instead directly call the newvoidmethod. [1] [2]