diff --git a/be/src/exec/operator/exchange_sink_buffer.cpp b/be/src/exec/operator/exchange_sink_buffer.cpp index 2c564891da02f7..539033be04e64b 100644 --- a/be/src/exec/operator/exchange_sink_buffer.cpp +++ b/be/src/exec/operator/exchange_sink_buffer.cpp @@ -96,12 +96,11 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_ _state(state), _context(state->get_query_ctx()), _exchange_sink_num(sender_ins_ids.size()), - _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size && - state->query_options().exchange_multi_blocks_byte_size > 0) { - if (_send_multi_blocks) { - _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; - } -} + _send_multi_blocks_byte_size( + state->query_options().__isset.exchange_multi_blocks_byte_size && + state->query_options().exchange_multi_blocks_byte_size > 0 + ? state->query_options().exchange_multi_blocks_byte_size + : -1) {} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -264,15 +263,15 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (q_ptr && !q_ptr->empty()) { auto& q = *q_ptr; - std::vector requests(_send_multi_blocks ? q.size() : 1); + std::vector requests(_send_multi_blocks_byte_size > 0 ? q.size() : 1); for (int i = 0; i < requests.size(); i++) { requests[i] = std::move(q.front()); q.pop(); if (requests[i].block) { - // make sure rpc byte size under the _send_multi_blocks_bytes_size + // make sure rpc byte size under the _send_multi_blocks_byte_size mem_byte += requests[i].block->ByteSizeLong(); - if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { + if (_send_multi_blocks_byte_size > 0 && mem_byte > _send_multi_blocks_byte_size) { requests.resize(i + 1); break; } @@ -280,30 +279,17 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { } // If we have data to shuffle which is not broadcasted - auto& request = requests[0]; auto& brpc_request = instance_data.request; brpc_request->set_sender_id(channel->_parent->sender_id()); brpc_request->set_be_number(channel->_parent->be_number()); - if (_send_multi_blocks) { - for (auto& req : requests) { - if (req.block && !req.block->column_metas().empty()) { - auto add_block = brpc_request->add_blocks(); - add_block->Swap(req.block.get()); - } - } - } else { - if (request.block && !request.block->column_metas().empty()) { - brpc_request->set_allocated_block(request.block.get()); + for (auto& req : requests) { + if (req.block && !req.block->column_metas().empty()) { + auto add_block = brpc_request->add_blocks(); + add_block->Swap(req.block.get()); } } - Defer release_block([&]() { - if (!_send_multi_blocks && request.block) { - static_cast(brpc_request->release_block()); - } else { - brpc_request->clear_blocks(); - } - }); + Defer release_block([&]() { brpc_request->clear_blocks(); }); instance_data.seq += requests.size(); brpc_request->set_packet_seq(instance_data.seq); @@ -387,7 +373,8 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) { auto& broadcast_q = *broadcast_q_ptr; // If we have data to shuffle which is broadcasted - std::vector requests(_send_multi_blocks ? broadcast_q.size() : 1); + std::vector requests( + _send_multi_blocks_byte_size > 0 ? broadcast_q.size() : 1); for (int i = 0; i < requests.size(); i++) { requests[i] = broadcast_q.front(); broadcast_q.pop(); @@ -395,49 +382,37 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (requests[i].block_holder->get_block()) { // make sure rpc byte size under the _send_multi_blocks_bytes_size mem_byte += requests[i].block_holder->get_block()->ByteSizeLong(); - if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { + if (_send_multi_blocks_byte_size > 0 && mem_byte > _send_multi_blocks_byte_size) { requests.resize(i + 1); break; } } } - auto& request = requests[0]; auto& brpc_request = instance_data.request; brpc_request->set_sender_id(channel->_parent->sender_id()); brpc_request->set_be_number(channel->_parent->be_number()); - if (_send_multi_blocks) { - for (int i = 0; i < requests.size(); i++) { - auto& req = requests[i]; - if (auto block = req.block_holder->get_block(); - block && !block->column_metas().empty()) { - auto add_block = brpc_request->add_blocks(); - for (int j = 0; j < block->column_metas_size(); ++j) { - add_block->add_column_metas()->CopyFrom(block->column_metas(j)); - } - add_block->set_be_exec_version(block->be_exec_version()); - add_block->set_compressed(block->compressed()); - add_block->set_compression_type(block->compression_type()); - add_block->set_uncompressed_size(block->uncompressed_size()); - add_block->set_allocated_column_values(block->mutable_column_values()); + for (int i = 0; i < requests.size(); i++) { + auto& req = requests[i]; + if (auto block = req.block_holder->get_block(); + block && !block->column_metas().empty()) { + auto add_block = brpc_request->add_blocks(); + for (int j = 0; j < block->column_metas_size(); ++j) { + add_block->add_column_metas()->CopyFrom(block->column_metas(j)); } - } - } else { - if (request.block_holder->get_block() && - !request.block_holder->get_block()->column_metas().empty()) { - brpc_request->set_allocated_block(request.block_holder->get_block()); + add_block->set_be_exec_version(block->be_exec_version()); + add_block->set_compressed(block->compressed()); + add_block->set_compression_type(block->compression_type()); + add_block->set_uncompressed_size(block->uncompressed_size()); + add_block->set_allocated_column_values(block->mutable_column_values()); } } Defer release_block([&]() { - if (!_send_multi_blocks && request.block_holder->get_block()) { - static_cast(brpc_request->release_block()); - } else { - for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) { - static_cast(brpc_request->mutable_blocks(i)->release_column_values()); - } - brpc_request->clear_blocks(); + for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) { + static_cast(brpc_request->mutable_blocks(i)->release_column_values()); } + brpc_request->clear_blocks(); }); instance_data.seq += requests.size(); brpc_request->set_packet_seq(instance_data.seq); diff --git a/be/src/exec/operator/exchange_sink_buffer.h b/be/src/exec/operator/exchange_sink_buffer.h index 7478c63f4e8f12..0f6ba8500e3dfa 100644 --- a/be/src/exec/operator/exchange_sink_buffer.h +++ b/be/src/exec/operator/exchange_sink_buffer.h @@ -335,8 +335,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. std::vector _parents; const int64_t _exchange_sink_num; - bool _send_multi_blocks = false; - int _send_multi_blocks_byte_size = 256 * 1024; + int _send_multi_blocks_byte_size = -1; }; } // namespace doris