[fix](pipeline) Fix wake up early without terminate call#63539
[fix](pipeline) Fix wake up early without terminate call#63539BiteTheDDDDt wants to merge 1 commit into
Conversation
```
Thread A (正在执行 HashJoin Build Task) Thread B (下游 pipeline 全部完成)
──────────────────────────────────────── ──────────────────────────────────
Defer 开始执行:
line 475: 读取 _wake_up_early → false
decrement_running_task() 触发
make_all_runnable():
line 127: set_wake_up_early() → true
line 132: terminate()
→ finish_dep.set_always_ready()
line 481: else if (_eos && !_spilling &&
!_is_pending_finish())
_is_pending_finish() = false ← 因为 always_ready!
line 483: *done = true
← 注意: _sink->terminate() 从未被调用!
close_task():
task->close(OK):
```
This pull request addresses a subtle race condition in the pipeline task
execution logic and adds a targeted test to verify the fix. The main
improvement ensures that operator termination is reliably triggered even
in the presence of concurrent state changes, preventing operators from
being left in an inconsistent state. Additionally, the pull request
introduces a debug point for precise testing and includes minor test
code cleanups.
**Race condition fix and test coverage:**
* Fixed a race condition in `PipelineTask::execute()` by reordering the
logic to ensure `terminate()` is always called if required, even when
another thread updates task state between checks. Added a debug point to
simulate the race for testing.
* Added a new test `TEST_TERMINATE_RACE_FIX` in `pipeline_task_test.cpp`
that uses the debug point to reliably reproduce and verify the race
condition fix, ensuring operator termination is not skipped.
**Test infrastructure and cleanup:**
* Included `debug_points.h` and `common/config.h` in
`pipeline_task_test.cpp` to support debug point injection and
configuration toggling for the new test.
[[1]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR21)
[[2]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR36)
* Minor formatting cleanup in an existing test case for readability.
(cherry picked from commit 2b9a1a5)
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR backports the fix from #61679 to branch-4.0 to address a pipeline task race where an early wake-up can cause a task to finish without calling operator terminate(), leaving some operators (e.g., runtime filter producers) in an invalid state during close.
Changes:
- Reorders termination logic in
PipelineTask::execute()so operator termination happens after the pending-finish check, ensuring early wake-up is observed before close. - Adds a debug point to deterministically simulate the problematic interleaving.
- Adds a new unit test (
TEST_TERMINATE_RACE_FIX) to reproduce and validate the race fix.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| be/src/pipeline/pipeline_task.cpp | Reorders termination relative to _is_pending_finish() and adds a debug point hook to simulate the race. |
| be/test/pipeline/pipeline_task_test.cpp | Adds a new unit test that uses the debug point to validate operators are terminated under the targeted race. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| sink_finish_dep->set_ready(); | ||
| config::enable_debug_points = true; | ||
| DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if"); | ||
| { | ||
| bool done = false; | ||
| EXPECT_TRUE(task->execute(&done).ok()); | ||
| EXPECT_TRUE(task->_eos); | ||
| EXPECT_TRUE(done); | ||
| EXPECT_TRUE(task->_wake_up_early); | ||
| EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated); | ||
| EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated); | ||
| } | ||
| DebugPoints::instance()->clear(); | ||
| config::enable_debug_points = false; |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
|
run buildall |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
What problem does this PR solve?
Issue Number: None
Related PR: #61679
Problem Summary: Backport #61679 to branch-4.0. A pipeline task can race with downstream early wake-up: one thread may observe
_wake_up_earlyas false, then another thread sets_wake_up_earlyand unblocks finish dependencies, and the first thread later sees_is_pending_finish()as false and finishes without calling operatorterminate(). For hash join build tasks this can leave runtime filter producers inWAITING_FOR_SYNCED_SIZE; during close/build,insert()expectsWAITING_FOR_DATAand reports an invalid runtime filter producer state. This change moves operator termination after the pending-finish check so the second_wake_up_earlyread observes the early wake-up and terminates operators before close.Release note
None
Check List (For Author)
build-support/check-format.sh be/src/pipeline/pipeline_task.cpp be/test/pipeline/pipeline_task_test.cppninja -C be/build_Release src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.oninja -C be/ut_build_ASAN src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o test/CMakeFiles/doris_be_test.dir/exec/pipeline/pipeline_task_test.cpp.o test/doris_be_testbe/ut_build_ASAN/test/doris_be_test --gtest_filter=PipelineTaskTest.TEST_TERMINATE_RACE_FIX --gtest_print_time=true