diff --git a/be/src/exec/pipeline/pipeline_task.cpp b/be/src/exec/pipeline/pipeline_task.cpp index ffd1817683f685..1b82530ebb8b03 100644 --- a/be/src/exec/pipeline/pipeline_task.cpp +++ b/be/src/exec/pipeline/pipeline_task.cpp @@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) { // If task is woke up early, we should terminate all operators, and this task could be closed immediately. if (_wake_up_early) { - terminate(); - THROW_IF_ERROR(_root->terminate(_state)); - THROW_IF_ERROR(_sink->terminate(_state)); _eos = true; *done = true; } else if (_eos && !_spilling && (fragment_context->is_canceled() || !_is_pending_finish())) { + // Debug point for testing the race condition fix: inject set_wake_up_early() + + // terminate() here to simulate Thread B writing A then B between Thread A's two + // reads of _wake_up_early. + DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", { + set_wake_up_early(); + terminate(); + }); *done = true; } + + // NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check + // above, not before. This ordering is critical to avoid a race condition: + // + // Pipeline::make_all_runnable() writes in this order: + // (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready] + // + // If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a + // window where Thread A reads _wake_up_early=false, then Thread B writes both A and B, + // then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would + // then set *done=true without ever calling operator terminate(), causing close() to run + // on operators that were never properly terminated (e.g. RuntimeFilterProducer still in + // WAITING_FOR_SYNCED_SIZE state when insert() is called). + // + // By reading _is_pending_finish() (B) before the second read of _wake_up_early (A), + // if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe + // A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called. + if (_wake_up_early) { + terminate(); + THROW_IF_ERROR(_root->terminate(_state)); + THROW_IF_ERROR(_sink->terminate(_state)); + } }}; const auto query_id = _state->query_id(); // If this task is already EOS and block is empty (which means we already output all blocks), diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp b/be/test/exec/pipeline/pipeline_task_test.cpp index b47f00af89f5f1..db61819c4dac12 100644 --- a/be/test/exec/pipeline/pipeline_task_test.cpp +++ b/be/test/exec/pipeline/pipeline_task_test.cpp @@ -18,6 +18,7 @@ #include #include +#include "common/config.h" #include "common/status.h" #include "exec/operator/operator.h" #include "exec/operator/spill_utils.h" @@ -32,6 +33,7 @@ #include "testutil/mock/mock_runtime_state.h" #include "testutil/mock/mock_thread_mem_tracker_mgr.h" #include "testutil/mock/mock_workload_group_mgr.h" +#include "util/debug_points.h" namespace doris { @@ -1534,4 +1536,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) { } } +// Test for the race condition fix between _wake_up_early and _is_pending_finish(). +// +// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate() +// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false +// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to +// _always_ready from B), Thread A would set *done=true without calling operator terminate(). +// +// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's +// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent +// read, ensuring terminate() is always called. +// +// This test uses a debug point injected into the else-if branch to simulate the exact bad timing: +// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false +// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check. +TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) { + auto num_instances = 1; + auto pip_id = 0; + auto task_id = 0; + auto pip = std::make_shared(pip_id, num_instances, num_instances); + { + OperatorPtr source_op; + source_op.reset(new DummyOperator()); + EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok()); + + int op_id = 1; + int node_id = 2; + int dest_id = 3; + DataSinkOperatorPtr sink_op; + sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id)); + EXPECT_TRUE(pip->set_sink(sink_op).ok()); + } + auto profile = std::make_shared("Pipeline : " + std::to_string(pip_id)); + std::map, std::vector>>> + shared_state_map; + _runtime_state->resize_op_id_to_local_state(-1); + auto task = std::make_shared(pip, task_id, _runtime_state.get(), _context, + profile.get(), shared_state_map, task_id); + task->_exec_time_slice = 10'000'000'000ULL; + { + std::vector scan_range; + int sender_id = 0; + TDataSink tsink; + EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok()); + } + _query_ctx->get_execution_dependency()->set_ready(); + + // Get the sink's finish dependency and block it to simulate a pending async operation + // (e.g. runtime filter size sync RPC in flight). + auto* sink_finish_dep = + _runtime_state->get_sink_local_state()->cast().finishdependency(); + EXPECT_NE(sink_finish_dep, nullptr); + sink_finish_dep->block(); + + // Drive the task to EOS so it will enter the Defer's pending-finish check. + task->_operators.front()->cast()._eos = true; + { + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + // EOS reached but still blocked on finish dependency: not done yet. + EXPECT_TRUE(task->_eos); + EXPECT_FALSE(done); + EXPECT_FALSE(task->_wake_up_early); + } + + // Now unblock the finish dependency (simulates the async op completing) and activate the + // debug point. The debug point fires inside the else-if branch — after _is_pending_finish() + // returns false but before the second _wake_up_early read — and calls set_wake_up_early() + + // terminate(). This precisely reproduces the race where Thread B's writes land between + // Thread A's two reads of _wake_up_early. + sink_finish_dep->set_ready(); + config::enable_debug_points = true; + DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if"); + { + bool done = false; + EXPECT_TRUE(task->execute(&done).ok()); + EXPECT_TRUE(task->_eos); + EXPECT_TRUE(done); + // The key assertion: even though the task took the else-if path (not the + // if(_wake_up_early) path), operator terminate() must have been called because the + // second read of _wake_up_early correctly observed the value set by the debug point. + EXPECT_TRUE(task->_wake_up_early); + EXPECT_TRUE(task->_operators.front()->cast()._terminated); + EXPECT_TRUE(task->_sink->cast()._terminated); + } + DebugPoints::instance()->clear(); + config::enable_debug_points = false; +} + } // namespace doris