diff --git a/be/src/exec/operator/exchange_sink_buffer.cpp b/be/src/exec/operator/exchange_sink_buffer.cpp index ec032aca17ed26..2c564891da02f7 100644 --- a/be/src/exec/operator/exchange_sink_buffer.cpp +++ b/be/src/exec/operator/exchange_sink_buffer.cpp @@ -158,12 +158,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) { print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; - if (instance_data.rpc_channel_is_turn_off) { - return Status::EndOfFile("receiver eof"); - } bool send_now = false; { std::unique_lock lock(*instance_data.mutex); + if (instance_data.rpc_channel_is_turn_off) { + return Status::EndOfFile("receiver eof"); + } // Do not have in process rpc, directly send if (instance_data.rpc_channel_is_idle) { send_now = true; @@ -199,12 +199,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& r print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; - if (instance_data.rpc_channel_is_turn_off) { - return Status::EndOfFile("receiver eof"); - } bool send_now = false; { std::unique_lock lock(*instance_data.mutex); + if (instance_data.rpc_channel_is_turn_off) { + return Status::EndOfFile("receiver eof"); + } // Do not have in process rpc, directly send if (instance_data.rpc_channel_is_idle) { send_now = true; @@ -586,6 +586,13 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins, for (auto& parent : _parents) { parent->on_channel_finished(ins.id); } + } else { + // Task execution context is already gone. The pipeline fragment context is being + // destroyed, so on_channel_finished is skipped. This is normally safe because + // unblock_all_dependencies() should have already set finish_dependency to always_ready. + LOG(INFO) << "ExchangeSinkBuffer::_turn_off_channel: task context is null, " + << "skipping on_channel_finished for instance " << ins.id + << ", dest_node_id=" << _dest_node_id << ", node_id=" << _node_id; } } @@ -683,7 +690,9 @@ std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { for (auto& [_, list] : instance_data->package_queue) { queue_size += list.size(); } - fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size); + fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}, is turn off: {}\n", + fmt::format(FMT_COMPILE("{:x}"), static_cast(id)), queue_size, + instance_data->rpc_channel_is_turn_off); } return fmt::to_string(debug_string_buffer); } diff --git a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp index cab061da1e5c61..ab6b03b13c5572 100644 --- a/be/test/exec/pipeline/vdata_stream_recvr_test.cpp +++ b/be/test/exec/pipeline/vdata_stream_recvr_test.cpp @@ -616,6 +616,4 @@ TEST_F(DataStreamRecvrTest, transmit_block) { recvr->close(); } -// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.* - } // namespace doris