[Bug](pipeline) fix wake up early without terminate call#61679
[Bug](pipeline) fix wake up early without terminate call#61679BiteTheDDDDt wants to merge 2 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
Fixes a pipeline-task race where PipelineTask::execute() could mark a task done without ensuring operator termination when concurrent state changes occur, and adds a targeted unit test using a debug point to reproduce the scenario deterministically.
Changes:
- Reordered
PipelineTask::execute()deferred logic so operator termination is triggered reliably when_wake_up_earlyis observed after the pending-finish check. - Added a debug point to simulate the problematic interleaving for test reproducibility.
- Added a new unit test (
TEST_TERMINATE_RACE_FIX) and minor formatting cleanup in an existing test.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| be/src/exec/pipeline/pipeline_task.cpp | Reorders termination vs. pending-finish checks and introduces a debug point to simulate the race. |
| be/test/exec/pipeline/pipeline_task_test.cpp | Adds a regression test for the race using debug points; minor readability formatting tweak. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| config::enable_debug_points = true; | ||
| DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if"); | ||
| { |
There was a problem hiding this comment.
This test flips the global config::enable_debug_points flag but restores it to false unconditionally. If another test (or future setup) enables debug points, this will leak state across tests. Prefer capturing the original value and restoring it (e.g., via RAII/Defer), similar to SpillableDebugPointHelper in be/test/exec/operator/spillable_operator_test_helper.h.
| EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated); | ||
| EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated); | ||
| } | ||
| DebugPoints::instance()->clear(); |
There was a problem hiding this comment.
DebugPoints::instance()->clear() clears all debug points globally, which can interfere with other tests in the same binary if they rely on debug points. Since this test only adds one point, prefer DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if") and leave unrelated debug points intact.
| DebugPoints::instance()->clear(); | |
| DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if"); |
| // Debug point for testing the race condition fix: inject set_wake_up_early() + | ||
| // terminate() here to simulate Thread B writing A then B between Thread A's two | ||
| // reads of _wake_up_early. | ||
| DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", { | ||
| set_wake_up_early(); | ||
| terminate(); |
There was a problem hiding this comment.
The injected debug-point block calls terminate(), but terminate() is called again immediately afterwards when _wake_up_early is re-checked. If terminate() ever becomes non-idempotent, this debug point will change behavior (and can mask issues). Consider removing the terminate() call from the debug-point injection and only toggling _wake_up_early, or otherwise guard so terminate() runs only once per execute() call when the debug point is enabled.
| // Debug point for testing the race condition fix: inject set_wake_up_early() + | |
| // terminate() here to simulate Thread B writing A then B between Thread A's two | |
| // reads of _wake_up_early. | |
| DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", { | |
| set_wake_up_early(); | |
| terminate(); | |
| // Debug point for testing the race condition fix: inject set_wake_up_early() here | |
| // to simulate Thread B writing A then B between Thread A's two reads of | |
| // _wake_up_early. terminate() will be invoked once below when _wake_up_early is | |
| // re-checked, to avoid double-calling terminate() in the debug path. | |
| DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", { | |
| set_wake_up_early(); |
| // (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready] | ||
| // | ||
| // If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a | ||
| // window where Thread A reads _wake_up_early=false, then Thread B writes both A and B, | ||
| // then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would | ||
| // then set *done=true without ever calling operator terminate(), causing close() to run | ||
| // on operators that were never properly terminated (e.g. RuntimeFilterProducer still in | ||
| // WAITING_FOR_SYNCED_SIZE state when insert() is called). | ||
| // | ||
| // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), | ||
| // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe | ||
| // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. |
There was a problem hiding this comment.
The explanatory comment says _is_pending_finish() becomes false “due to _always_ready”, but _is_pending_finish() ultimately checks Dependency::_ready via Dependency::is_blocked_by(). set_always_ready() does set _always_ready, but it unblocks tasks by calling set_ready() (setting _ready=true). Consider rewording this comment to reference _ready/set_ready() so the described mechanism matches the actual code path.
| // (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready] | |
| // | |
| // If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a | |
| // window where Thread A reads _wake_up_early=false, then Thread B writes both A and B, | |
| // then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would | |
| // then set *done=true without ever calling operator terminate(), causing close() to run | |
| // on operators that were never properly terminated (e.g. RuntimeFilterProducer still in | |
| // WAITING_FOR_SYNCED_SIZE state when insert() is called). | |
| // | |
| // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), | |
| // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe | |
| // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. | |
| // (A) set_wake_up_early() -> (B) terminate() [marks the finish dependency ready | |
| // via set_always_ready()/set_ready(), which sets Dependency::_ready=true] | |
| // | |
| // If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a | |
| // window where Thread A reads _wake_up_early=false, then Thread B writes both A and B, | |
| // then Thread A reads _is_pending_finish()=false (because the finish dependency's | |
| // underlying Dependency::_ready flag is now true). Thread A would then set *done=true | |
| // without ever calling operator terminate(), causing close() to run on operators that | |
| // were never properly terminated (e.g. RuntimeFilterProducer still in | |
| // WAITING_FOR_SYNCED_SIZE state when insert() is called). | |
| // | |
| // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), | |
| // if Thread A observes B's effect (Dependency::_ready=true on the finish dependency), it | |
| // is guaranteed to also observe A's effect (_wake_up_early=true) on this second read, | |
| // ensuring terminate() is called. |
There was a problem hiding this comment.
Code Review Summary
This PR fixes a real race condition in PipelineTask::execute() where Pipeline::make_all_runnable() writing set_wake_up_early() (A) then terminate() (B) could interleave with the Defer block's reads such that Thread A observes _always_ready=true (from B) but misses _wake_up_early=true (from A), resulting in *done=true without operator terminate() being called.
Critical Checkpoint Conclusions
Goal and correctness: The goal is to ensure terminate() is always called on operators when _wake_up_early is set, even when concurrent make_all_runnable() writes interleave with the Defer block reads. The fix correctly achieves this by reordering: first check _is_pending_finish() (reads _always_ready — effect B), then check _wake_up_early (effect A). Under seq_cst ordering (default for std::atomic<bool>), if the second read sees B's effect, it must see A's effect since A was written before B in Thread B's program order. The logic is sound.
Modification minimality: The change is focused and minimal — only the Defer block in execute() is restructured, plus a debug point and test.
Concurrency analysis:
_wake_up_earlyisstd::atomic<bool>(seq_cst by default) — correct._always_readyinDependencyisstd::atomic<bool>(seq_cst by default) — correct.- The write order in
make_all_runnable()(two separate loops: firstset_wake_up_earlyon all tasks, thenterminateon all tasks) guarantees A happens-before B per program order, which combined with seq_cst gives the total order the comment describes. - The read order in the Defer (first
_is_pending_finish()reading_always_ready, thenif (_wake_up_early)) correctly exploits this total order.
Parallel code paths: The do_revoke_memory() path at line 763 also has if (_wake_up_early) { terminate(); ... } in its Defer, but it does NOT have the _is_pending_finish() check, so it doesn't have the same race window. No action needed there.
Test coverage: The new TEST_TERMINATE_RACE_FIX test uses a debug point to deterministically inject the race timing. This is a pragmatic approach since the real race is timing-dependent. The test verifies that operator _terminated is set even when the debug point fires between the two reads. Good coverage.
Pre-existing issue (advisory, not introduced by this PR): THROW_IF_ERROR at lines 508-509 runs inside a Defer destructor. Per be/src/common/AGENTS.md, THROW_IF_ERROR should be kept out of Defer and destructors — WARN_IF_ERROR is recommended instead. However, this is a pre-existing pattern (the original code had the same THROW_IF_ERROR calls in the same Defer block), and the Doris Defer implementation handles this safely via std::uncaught_exceptions() check. Flagged as advisory inline.
Configuration items: None added.
Incompatible changes: None.
Observability: No new observability needed; the existing debug point is sufficient for testing.
Performance: No performance impact — the fix adds one additional atomic read of _wake_up_early in the Defer block, which is negligible.
Other issues: None found. The fix is correct and well-documented with thorough comments explaining the ordering invariant.
| // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. | ||
| if (_wake_up_early) { | ||
| terminate(); | ||
| THROW_IF_ERROR(_root->terminate(_state)); |
There was a problem hiding this comment.
Advisory (pre-existing): Per be/src/common/AGENTS.md, THROW_IF_ERROR should be kept out of Defer blocks and destructors — the recommendation is to use WARN_IF_ERROR instead. While the Doris Defer implementation handles this safely (it checks std::uncaught_exceptions() and swallows exceptions during stack unwinding), during unwinding any error from _root->terminate() or _sink->terminate() would be silently lost.
This is a pre-existing pattern (the original code had the same calls in the same Defer), so it's not a blocker for this PR. But since the code is being moved, this would be a good opportunity to switch to WARN_IF_ERROR if operator terminate failures are truly best-effort cleanup.
|
run buildall |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
1 similar comment
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
This pull request addresses a subtle race condition in the pipeline task execution logic and adds a targeted test to verify the fix. The main improvement ensures that operator termination is reliably triggered even in the presence of concurrent state changes, preventing operators from being left in an inconsistent state. Additionally, the pull request introduces a debug point for precise testing and includes minor test code cleanups.
Race condition fix and test coverage:
PipelineTask::execute()by reordering the logic to ensureterminate()is always called if required, even when another thread updates task state between checks. Added a debug point to simulate the race for testing.TEST_TERMINATE_RACE_FIXinpipeline_task_test.cppthat uses the debug point to reliably reproduce and verify the race condition fix, ensuring operator termination is not skipped.Test infrastructure and cleanup:
debug_points.handcommon/config.hinpipeline_task_test.cppto support debug point injection and configuration toggling for the new test. [1] [2]