Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions be/src/exec/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {

// If task is woke up early, we should terminate all operators, and this task could be closed immediately.
if (_wake_up_early) {
terminate();
THROW_IF_ERROR(_root->terminate(_state));
THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
*done = true;
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish())) {
// Debug point for testing the race condition fix: inject set_wake_up_early() +
// terminate() here to simulate Thread B writing A then B between Thread A's two
// reads of _wake_up_early.
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
set_wake_up_early();
terminate();
Comment on lines +480 to +485
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The injected debug-point block calls terminate(), but terminate() is called again immediately afterwards when _wake_up_early is re-checked. If terminate() ever becomes non-idempotent, this debug point will change behavior (and can mask issues). Consider removing the terminate() call from the debug-point injection and only toggling _wake_up_early, or otherwise guard so terminate() runs only once per execute() call when the debug point is enabled.

Suggested change
// Debug point for testing the race condition fix: inject set_wake_up_early() +
// terminate() here to simulate Thread B writing A then B between Thread A's two
// reads of _wake_up_early.
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
set_wake_up_early();
terminate();
// Debug point for testing the race condition fix: inject set_wake_up_early() here
// to simulate Thread B writing A then B between Thread A's two reads of
// _wake_up_early. terminate() will be invoked once below when _wake_up_early is
// re-checked, to avoid double-calling terminate() in the debug path.
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
set_wake_up_early();

Copilot uses AI. Check for mistakes.
});
*done = true;
}

// NOTE: The terminate() call is intentionally placed AFTER the _is_pending_finish() check
// above, not before. This ordering is critical to avoid a race condition:
//
// Pipeline::make_all_runnable() writes in this order:
// (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready]
//
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
// then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would
// then set *done=true without ever calling operator terminate(), causing close() to run
// on operators that were never properly terminated (e.g. RuntimeFilterProducer still in
// WAITING_FOR_SYNCED_SIZE state when insert() is called).
//
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
Comment on lines +494 to +505
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explanatory comment says _is_pending_finish() becomes false “due to _always_ready”, but _is_pending_finish() ultimately checks Dependency::_ready via Dependency::is_blocked_by(). set_always_ready() does set _always_ready, but it unblocks tasks by calling set_ready() (setting _ready=true). Consider rewording this comment to reference _ready/set_ready() so the described mechanism matches the actual code path.

Suggested change
// (A) set_wake_up_early() -> (B) terminate() [sets finish_dep._always_ready]
//
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
// then Thread A reads _is_pending_finish()=false (due to _always_ready). Thread A would
// then set *done=true without ever calling operator terminate(), causing close() to run
// on operators that were never properly terminated (e.g. RuntimeFilterProducer still in
// WAITING_FOR_SYNCED_SIZE state when insert() is called).
//
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
// if Thread A observes B's effect (_always_ready=true), it is guaranteed to also observe
// A's effect (_wake_up_early=true) on this second read, ensuring terminate() is called.
// (A) set_wake_up_early() -> (B) terminate() [marks the finish dependency ready
// via set_always_ready()/set_ready(), which sets Dependency::_ready=true]
//
// If we checked _wake_up_early (A) before _is_pending_finish() (B), there would be a
// window where Thread A reads _wake_up_early=false, then Thread B writes both A and B,
// then Thread A reads _is_pending_finish()=false (because the finish dependency's
// underlying Dependency::_ready flag is now true). Thread A would then set *done=true
// without ever calling operator terminate(), causing close() to run on operators that
// were never properly terminated (e.g. RuntimeFilterProducer still in
// WAITING_FOR_SYNCED_SIZE state when insert() is called).
//
// By reading _is_pending_finish() (B) before the second read of _wake_up_early (A),
// if Thread A observes B's effect (Dependency::_ready=true on the finish dependency), it
// is guaranteed to also observe A's effect (_wake_up_early=true) on this second read,
// ensuring terminate() is called.

Copilot uses AI. Check for mistakes.
if (_wake_up_early) {
terminate();
THROW_IF_ERROR(_root->terminate(_state));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Advisory (pre-existing): Per be/src/common/AGENTS.md, THROW_IF_ERROR should be kept out of Defer blocks and destructors — the recommendation is to use WARN_IF_ERROR instead. While the Doris Defer implementation handles this safely (it checks std::uncaught_exceptions() and swallows exceptions during stack unwinding), during unwinding any error from _root->terminate() or _sink->terminate() would be silently lost.

This is a pre-existing pattern (the original code had the same calls in the same Defer), so it's not a blocker for this PR. But since the code is being moved, this would be a good opportunity to switch to WARN_IF_ERROR if operator terminate failures are truly best-effort cleanup.

THROW_IF_ERROR(_sink->terminate(_state));
}
}};
const auto query_id = _state->query_id();
// If this task is already EOS and block is empty (which means we already output all blocks),
Expand Down
91 changes: 91 additions & 0 deletions be/test/exec/pipeline/pipeline_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "common/config.h"
#include "common/status.h"
#include "exec/operator/operator.h"
#include "exec/operator/spill_utils.h"
Expand All @@ -32,6 +33,7 @@
#include "testutil/mock/mock_runtime_state.h"
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
#include "util/debug_points.h"

namespace doris {

Expand Down Expand Up @@ -1534,4 +1536,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
}
}

// Test for the race condition fix between _wake_up_early and _is_pending_finish().
//
// The race: Pipeline::make_all_runnable() writes in order (A) set_wake_up_early -> (B) terminate()
// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A reads _wake_up_early=false
// (A), then Thread B writes A and B, then Thread A reads _is_pending_finish()=false (due to
// _always_ready from B), Thread A would set *done=true without calling operator terminate().
//
// The fix: terminate() is called after _is_pending_finish() in the Defer. So if Thread A sees B's
// effect (_always_ready=true), it must also see A's effect (_wake_up_early=true) on the subsequent
// read, ensuring terminate() is always called.
//
// This test uses a debug point injected into the else-if branch to simulate the exact bad timing:
// the debug point fires set_wake_up_early() + terminate() after _is_pending_finish() returns false
// (due to finish_dep being naturally unblocked) but before the second _wake_up_early check.
TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
auto num_instances = 1;
auto pip_id = 0;
auto task_id = 0;
auto pip = std::make_shared<Pipeline>(pip_id, num_instances, num_instances);
{
OperatorPtr source_op;
source_op.reset(new DummyOperator());
EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());

int op_id = 1;
int node_id = 2;
int dest_id = 3;
DataSinkOperatorPtr sink_op;
sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
EXPECT_TRUE(pip->set_sink(sink_op).ok());
}
auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + std::to_string(pip_id));
std::map<int,
std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>>
shared_state_map;
_runtime_state->resize_op_id_to_local_state(-1);
auto task = std::make_shared<PipelineTask>(pip, task_id, _runtime_state.get(), _context,
profile.get(), shared_state_map, task_id);
task->_exec_time_slice = 10'000'000'000ULL;
{
std::vector<TScanRangeParams> scan_range;
int sender_id = 0;
TDataSink tsink;
EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
}
_query_ctx->get_execution_dependency()->set_ready();

// Get the sink's finish dependency and block it to simulate a pending async operation
// (e.g. runtime filter size sync RPC in flight).
auto* sink_finish_dep =
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
EXPECT_NE(sink_finish_dep, nullptr);
sink_finish_dep->block();

// Drive the task to EOS so it will enter the Defer's pending-finish check.
task->_operators.front()->cast<DummyOperator>()._eos = true;
{
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
// EOS reached but still blocked on finish dependency: not done yet.
EXPECT_TRUE(task->_eos);
EXPECT_FALSE(done);
EXPECT_FALSE(task->_wake_up_early);
}

// Now unblock the finish dependency (simulates the async op completing) and activate the
// debug point. The debug point fires inside the else-if branch — after _is_pending_finish()
// returns false but before the second _wake_up_early read — and calls set_wake_up_early() +
// terminate(). This precisely reproduces the race where Thread B's writes land between
// Thread A's two reads of _wake_up_early.
sink_finish_dep->set_ready();
config::enable_debug_points = true;
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
{
Comment on lines +1610 to +1612
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test flips the global config::enable_debug_points flag but restores it to false unconditionally. If another test (or future setup) enables debug points, this will leak state across tests. Prefer capturing the original value and restoring it (e.g., via RAII/Defer), similar to SpillableDebugPointHelper in be/test/exec/operator/spillable_operator_test_helper.h.

Copilot uses AI. Check for mistakes.
bool done = false;
EXPECT_TRUE(task->execute(&done).ok());
EXPECT_TRUE(task->_eos);
EXPECT_TRUE(done);
// The key assertion: even though the task took the else-if path (not the
// if(_wake_up_early) path), operator terminate() must have been called because the
// second read of _wake_up_early correctly observed the value set by the debug point.
EXPECT_TRUE(task->_wake_up_early);
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
}
DebugPoints::instance()->clear();
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DebugPoints::instance()->clear() clears all debug points globally, which can interfere with other tests in the same binary if they rely on debug points. Since this test only adds one point, prefer DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if") and leave unrelated debug points intact.

Suggested change
DebugPoints::instance()->clear();
DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if");

Copilot uses AI. Check for mistakes.
config::enable_debug_points = false;
}

} // namespace doris
Loading