Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions be/src/exec/operator/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) {
print_id(channel->_fragment_instance_id));
}
auto& instance_data = *_rpc_instances[ins_id];
if (instance_data.rpc_channel_is_turn_off) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*instance_data.mutex);
if (instance_data.rpc_channel_is_turn_off) {
return Status::EndOfFile("receiver eof");
}
// Do not have in process rpc, directly send
if (instance_data.rpc_channel_is_idle) {
send_now = true;
Expand Down Expand Up @@ -199,12 +199,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, BroadcastTransmitInfo&& r
print_id(channel->_fragment_instance_id));
}
auto& instance_data = *_rpc_instances[ins_id];
if (instance_data.rpc_channel_is_turn_off) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*instance_data.mutex);
if (instance_data.rpc_channel_is_turn_off) {
return Status::EndOfFile("receiver eof");
}
// Do not have in process rpc, directly send
if (instance_data.rpc_channel_is_idle) {
send_now = true;
Expand Down Expand Up @@ -586,6 +586,13 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
for (auto& parent : _parents) {
parent->on_channel_finished(ins.id);
}
} else {
// Task execution context is already gone. The pipeline fragment context is being
// destroyed, so on_channel_finished is skipped. This is normally safe because
// unblock_all_dependencies() should have already set finish_dependency to always_ready.
LOG(INFO) << "ExchangeSinkBuffer::_turn_off_channel: task context is null, "
<< "skipping on_channel_finished for instance " << ins.id
<< ", dest_node_id=" << _dest_node_id << ", node_id=" << _node_id;
}
}

Expand Down Expand Up @@ -683,7 +690,9 @@ std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
for (auto& [_, list] : instance_data->package_queue) {
queue_size += list.size();
}
fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size);
fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}, is turn off: {}\n",
fmt::format(FMT_COMPILE("{:x}"), static_cast<uint64_t>(id)), queue_size,
instance_data->rpc_channel_is_turn_off);
}
return fmt::to_string(debug_string_buffer);
}
Expand Down
2 changes: 0 additions & 2 deletions be/test/exec/pipeline/vdata_stream_recvr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,4 @@ TEST_F(DataStreamRecvrTest, transmit_block) {
recvr->close();
}

// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.*

} // namespace doris
Loading