Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 31 additions & 56 deletions be/src/exec/operator/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,11 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_
_state(state),
_context(state->get_query_ctx()),
_exchange_sink_num(sender_ins_ids.size()),
_send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size &&
state->query_options().exchange_multi_blocks_byte_size > 0) {
if (_send_multi_blocks) {
_send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size;
}
}
_send_multi_blocks_byte_size(
state->query_options().__isset.exchange_multi_blocks_byte_size &&
state->query_options().exchange_multi_blocks_byte_size > 0
? state->query_options().exchange_multi_blocks_byte_size
: -1) {}

void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want to
Expand Down Expand Up @@ -264,46 +263,33 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
if (q_ptr && !q_ptr->empty()) {
auto& q = *q_ptr;

std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1);
std::vector<TransmitInfo> requests(_send_multi_blocks_byte_size > 0 ? q.size() : 1);
for (int i = 0; i < requests.size(); i++) {
requests[i] = std::move(q.front());
q.pop();

if (requests[i].block) {
// make sure rpc byte size under the _send_multi_blocks_bytes_size
// make sure rpc byte size under the _send_multi_blocks_byte_size
mem_byte += requests[i].block->ByteSizeLong();
if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
if (_send_multi_blocks_byte_size > 0 && mem_byte > _send_multi_blocks_byte_size) {
requests.resize(i + 1);
break;
}
}
}

// If we have data to shuffle which is not broadcasted
auto& request = requests[0];
auto& brpc_request = instance_data.request;
brpc_request->set_sender_id(channel->_parent->sender_id());
brpc_request->set_be_number(channel->_parent->be_number());

if (_send_multi_blocks) {
for (auto& req : requests) {
if (req.block && !req.block->column_metas().empty()) {
auto add_block = brpc_request->add_blocks();
add_block->Swap(req.block.get());
}
}
} else {
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
for (auto& req : requests) {
if (req.block && !req.block->column_metas().empty()) {
auto add_block = brpc_request->add_blocks();
add_block->Swap(req.block.get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This drops the legacy block wire path and always sends exchange payloads through blocks, even when _send_multi_blocks_byte_size <= 0 and only one block is sent. That breaks mixed-version BE compatibility: VDataStreamMgr::transmit_data() still treats request->has_block() as the old compatibility path, so an older receiver that does not handle field 13 will ignore this payload and only see EOS, losing exchange rows during rolling upgrade. Please keep using set_allocated_block() when multi-block exchange is disabled, or gate the repeated blocks field on a version/compatibility check.

}
}
Defer release_block([&]() {
if (!_send_multi_blocks && request.block) {
static_cast<void>(brpc_request->release_block());
} else {
brpc_request->clear_blocks();
}
});
Defer release_block([&]() { brpc_request->clear_blocks(); });

instance_data.seq += requests.size();
brpc_request->set_packet_seq(instance_data.seq);
Expand Down Expand Up @@ -387,57 +373,46 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
} else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) {
auto& broadcast_q = *broadcast_q_ptr;
// If we have data to shuffle which is broadcasted
std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? broadcast_q.size() : 1);
std::vector<BroadcastTransmitInfo> requests(
_send_multi_blocks_byte_size > 0 ? broadcast_q.size() : 1);
for (int i = 0; i < requests.size(); i++) {
requests[i] = broadcast_q.front();
broadcast_q.pop();

if (requests[i].block_holder->get_block()) {
// make sure rpc byte size under the _send_multi_blocks_bytes_size
mem_byte += requests[i].block_holder->get_block()->ByteSizeLong();
if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) {
if (_send_multi_blocks_byte_size > 0 && mem_byte > _send_multi_blocks_byte_size) {
requests.resize(i + 1);
break;
}
}
}

auto& request = requests[0];
auto& brpc_request = instance_data.request;
brpc_request->set_sender_id(channel->_parent->sender_id());
brpc_request->set_be_number(channel->_parent->be_number());

if (_send_multi_blocks) {
for (int i = 0; i < requests.size(); i++) {
auto& req = requests[i];
if (auto block = req.block_holder->get_block();
block && !block->column_metas().empty()) {
auto add_block = brpc_request->add_blocks();
for (int j = 0; j < block->column_metas_size(); ++j) {
add_block->add_column_metas()->CopyFrom(block->column_metas(j));
}
add_block->set_be_exec_version(block->be_exec_version());
add_block->set_compressed(block->compressed());
add_block->set_compression_type(block->compression_type());
add_block->set_uncompressed_size(block->uncompressed_size());
add_block->set_allocated_column_values(block->mutable_column_values());
for (int i = 0; i < requests.size(); i++) {
auto& req = requests[i];
if (auto block = req.block_holder->get_block();
block && !block->column_metas().empty()) {
auto add_block = brpc_request->add_blocks();
for (int j = 0; j < block->column_metas_size(); ++j) {
add_block->add_column_metas()->CopyFrom(block->column_metas(j));
}
}
} else {
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
add_block->set_be_exec_version(block->be_exec_version());
add_block->set_compressed(block->compressed());
add_block->set_compression_type(block->compression_type());
add_block->set_uncompressed_size(block->uncompressed_size());
add_block->set_allocated_column_values(block->mutable_column_values());
}
}
Defer release_block([&]() {
if (!_send_multi_blocks && request.block_holder->get_block()) {
static_cast<void>(brpc_request->release_block());
} else {
for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) {
static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values());
}
brpc_request->clear_blocks();
for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) {
static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values());
}
brpc_request->clear_blocks();
});
instance_data.seq += requests.size();
brpc_request->set_packet_seq(instance_data.seq);
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/operator/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
// The ExchangeSinkLocalState in _parents is only used in _turn_off_channel.
std::vector<ExchangeSinkLocalState*> _parents;
const int64_t _exchange_sink_num;
bool _send_multi_blocks = false;
int _send_multi_blocks_byte_size = 256 * 1024;
int _send_multi_blocks_byte_size = -1;
};

} // namespace doris
Loading