From 5002ea9890c80328d3a294b0f649d5d274209b00 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Tue, 28 Apr 2026 09:38:34 +0800 Subject: [PATCH] [fix](be) Fix exchange receiver dependency race (#62777) ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Data or EOS could arrive after the receiver is registered but before its source dependency is installed. In that window, ready notification was lost and the exchange source could remain blocked. Recheck the queue state when setting the dependency and protect channel turn-off checks with the instance mutex. ### Release note Fix a potential query hang caused by an exchange receiver dependency ready-notification race. ### Check List (For Author) - Test: Unit Test / Manual test - Unit Test: ./run-be-ut.sh --run --filter=DataStreamRecvrTest.TestEosBeforeSetDependency:DataStreamRecvrTest.TestDataBeforeSetDependencyWithRemainingSenders - Manual test: build-support/check-format.sh - Static analysis attempted: build-support/run-clang-tidy.sh --build-dir be/ut_build_ASAN failed because clang-tidy could not analyze the files due to environment/pre-existing diagnostics, including missing stddef.h from system/libstdc++ headers and an existing unmatched NOLINTEND in be/src/core/types.h - Behavior changed: Yes. Exchange source dependencies are now marked ready if queued data or EOS arrived before set_dependency(). - Does this need documentation: No ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- be/src/exec/operator/exchange_sink_buffer.cpp | 23 +++++++++++++------ .../exec/pipeline/vdata_stream_recvr_test.cpp | 2 -- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/be/src/exec/operator/exchange_sink_buffer.cpp b/be/src/exec/operator/exchange_sink_buffer.cpp index a7ec1092115692..34a1c6de6664aa 100644 --- a/be/src/exec/operator/exchange_sink_buffer.cpp +++ b/be/src/exec/operator/exchange_sink_buffer.cpp @@ -158,12 +158,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) { print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; - if (instance_data.rpc_channel_is_turn_off) { - return Status::EndOfFile("receiver eof"); - } bool send_now = false; { std::unique_lock lock(*instance_data.mutex); + if (instance_data.rpc_channel_is_turn_off) { + return Status::EndOfFile("receiver eof"); + } // Do not have in process rpc, directly send if (instance_data.rpc_channel_is_idle) { send_now = true; @@ -199,12 +199,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& r print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; - if (instance_data.rpc_channel_is_turn_off) { - return Status::EndOfFile("receiver eof"); - } bool send_now = false; { std::unique_lock lock(*instance_data.mutex); + if (instance_data.rpc_channel_is_turn_off) { + return Status::EndOfFile("receiver eof"); + } // Do not have in process rpc, directly send if (instance_data.rpc_channel_is_idle) { send_now = true; @@ -581,6 +581,13 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins, for (auto& parent : _parents) { parent->on_channel_finished(ins.id); } + } else { + // Task execution context is already gone. The pipeline fragment context is being + // destroyed, so on_channel_finished is skipped. This is normally safe because + // unblock_all_dependencies() should have already set finish_dependency to always_ready. + LOG(INFO) << "ExchangeSinkBuffer::_turn_off_channel: task context is null, " + << "skipping on_channel_finished for instance " << ins.id + << ", dest_node_id=" << _dest_node_id << ", node_id=" << _node_id; } } @@ -678,7 +685,9 @@ std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { for (auto& [_, list] : instance_data->package_queue) { queue_size += list.size(); } - fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size); + fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}, is turn off: {}\n", + fmt::format(FMT_COMPILE("{:x}"), static_cast(id)), queue_size, + instance_data->rpc_channel_is_turn_off); } return fmt::to_string(debug_string_buffer); } diff --git a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp index cab061da1e5c61..ab6b03b13c5572 100644 --- a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp +++ b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp @@ -616,6 +616,4 @@ TEST_F(DataStreamRecvrTest, transmit_block) { recvr->close(); } -// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.* - } // namespace doris