[fix](scan) Avoid memory allocated by buffered_reader from being traced#42558
[fix](scan) Avoid memory allocated by buffered_reader from being traced#42558yiguolei merged 4 commits intoapache:spill_and_reservefrom
Conversation
## Proposed changes remove some unused code of Channel
…ed (apache#41921) Use OwnedSlice to replace `char*` in BufferedReader ## Proposed changes Issue Number: close #xxx <!--Describe your changes.-->
|
Thank you for your contribution to Apache Doris. Since 2024-03-18, the Document has been moved to doris-website. |
| static_cast<void>(channel->close(state)); | ||
| } | ||
|
|
||
| Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { |
There was a problem hiding this comment.
warning: function 'sink' has cognitive complexity of 242 (threshold 50) [readability-function-cognitive-complexity]
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) {
^Additional context
be/src/pipeline/exec/exchange_sink_operator.cpp:400: +1, including nesting penalty of 0, nesting level increased to 1
if (all_receiver_eof) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:409: +1, including nesting penalty of 0, nesting level increased to 1
if (state->get_query_ctx()->low_memory_mode() && local_state._sink_buffer != nullptr) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:409: +1
if (state->get_query_ctx()->low_memory_mode() && local_state._sink_buffer != nullptr) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:413: +1, including nesting penalty of 0, nesting level increased to 1
if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:417: +2, including nesting penalty of 1, nesting level increased to 2
if (local_state.only_local_exchange) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:418: +3, including nesting penalty of 2, nesting level increased to 3
if (!block->empty()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:433: +1, nesting level increased to 2
} else {
^be/src/pipeline/exec/exchange_sink_operator.cpp:437: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:437: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:440: +3, including nesting penalty of 2, nesting level increased to 3
if (serialized) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:442: +4, including nesting penalty of 3, nesting level increased to 4
if (!cur_block.empty()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:443: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:443: +6, including nesting penalty of 5, nesting level increased to 6
RETURN_IF_ERROR(local_state._serializer.serialize_block(
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:446: +1, nesting level increased to 4
} else {
^be/src/pipeline/exec/exchange_sink_operator.cpp:450: +4, including nesting penalty of 3, nesting level increased to 4
if (state->get_query_ctx()->low_memory_mode()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:475: +4, including nesting penalty of 3, nesting level increased to 4
if (moved) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:477: +1, nesting level increased to 4
} else {
^be/src/pipeline/exec/exchange_sink_operator.cpp:485: +1, nesting level increased to 1
} else if (_part_type == TPartitionType::RANDOM) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:488: +2, including nesting penalty of 1, nesting level increased to 2
if (!current_channel->is_receiver_eof()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:490: +3, including nesting penalty of 2, nesting level increased to 3
if (current_channel->is_local()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:492: +4, including nesting penalty of 3, nesting level increased to 4
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:224: expanded from macro 'HANDLE_CHANNEL_STATUS'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:492: +5, including nesting penalty of 4, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:225: expanded from macro 'HANDLE_CHANNEL_STATUS'
if (status.is<ErrorCode::END_OF_FILE>()) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:492: +1, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:227: expanded from macro 'HANDLE_CHANNEL_STATUS'
} else { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:492: +6, including nesting penalty of 5, nesting level increased to 6
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:492: +7, including nesting penalty of 6, nesting level increased to 7
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:493: +1, nesting level increased to 3
} else {
^be/src/pipeline/exec/exchange_sink_operator.cpp:495: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:495: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:497: +4, including nesting penalty of 3, nesting level increased to 4
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:224: expanded from macro 'HANDLE_CHANNEL_STATUS'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:497: +5, including nesting penalty of 4, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:225: expanded from macro 'HANDLE_CHANNEL_STATUS'
if (status.is<ErrorCode::END_OF_FILE>()) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:497: +1, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:227: expanded from macro 'HANDLE_CHANNEL_STATUS'
} else { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:497: +6, including nesting penalty of 5, nesting level increased to 6
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:497: +7, including nesting penalty of 6, nesting level increased to 7
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:502: +1, nesting level increased to 1
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
^be/src/pipeline/exec/exchange_sink_operator.cpp:507: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block));
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:507: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block));
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:513: +2, including nesting penalty of 1, nesting level increased to 2
if (_part_type == TPartitionType::HASH_PARTITIONED) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:514: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(channel_add_rows(
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:514: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(channel_add_rows(
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:517: +1, nesting level increased to 2
} else {
^be/src/pipeline/exec/exchange_sink_operator.cpp:518: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(channel_add_rows(
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:518: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(channel_add_rows(
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:530: +1, nesting level increased to 1
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:536: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:536: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:543: +2, including nesting penalty of 1, nesting level increased to 2
if (input_rows > 0) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:548: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:548: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:554: +3, including nesting penalty of 2, nesting level increased to 3
for (int idx = 0; idx < row_ids.size(); ++idx) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:562: +2, including nesting penalty of 1, nesting level increased to 2
if (eos) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:564: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:564: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state._send_new_partition_batch());
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:568: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels,
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:568: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels,
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:578: +1, nesting level increased to 1
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:585: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block));
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:585: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block));
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:589: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(channel_add_rows_with_idx(
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:589: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(channel_add_rows_with_idx(
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:600: +1, nesting level increased to 1
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:604: +2, including nesting penalty of 1, nesting level increased to 2
if (!current_channel->is_receiver_eof()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:606: +3, including nesting penalty of 2, nesting level increased to 3
if (current_channel->is_local()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:608: +4, including nesting penalty of 3, nesting level increased to 4
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:224: expanded from macro 'HANDLE_CHANNEL_STATUS'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:608: +5, including nesting penalty of 4, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:225: expanded from macro 'HANDLE_CHANNEL_STATUS'
if (status.is<ErrorCode::END_OF_FILE>()) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:608: +1, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:227: expanded from macro 'HANDLE_CHANNEL_STATUS'
} else { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:608: +6, including nesting penalty of 5, nesting level increased to 6
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:608: +7, including nesting penalty of 6, nesting level increased to 7
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:609: +1, nesting level increased to 3
} else {
^be/src/pipeline/exec/exchange_sink_operator.cpp:611: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:611: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get()));
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:613: +4, including nesting penalty of 3, nesting level increased to 4
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:224: expanded from macro 'HANDLE_CHANNEL_STATUS'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:613: +5, including nesting penalty of 4, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:225: expanded from macro 'HANDLE_CHANNEL_STATUS'
if (status.is<ErrorCode::END_OF_FILE>()) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:613: +1, nesting level increased to 5
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:227: expanded from macro 'HANDLE_CHANNEL_STATUS'
} else { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:613: +6, including nesting penalty of 5, nesting level increased to 6
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:639: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:613: +7, including nesting penalty of 6, nesting level increased to 7
HANDLE_CHANNEL_STATUS(state, current_channel, status);
^be/src/vec/sink/vdata_stream_sender.h:228: expanded from macro 'HANDLE_CHANNEL_STATUS'
RETURN_IF_ERROR(status); \
^be/src/common/status.h:641: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/pipeline/exec/exchange_sink_operator.cpp:618: +2, including nesting penalty of 1, nesting level increased to 2
if (_writer_count < local_state.channels.size()) {
^be/src/pipeline/exec/exchange_sink_operator.cpp:619: +3, including nesting penalty of 2, nesting level increased to 3
if (_data_processed >=
^be/src/pipeline/exec/exchange_sink_operator.cpp:626: +1, nesting level increased to 1
} else {
^be/src/pipeline/exec/exchange_sink_operator.cpp:633: +1, including nesting penalty of 0, nesting level increased to 1
if (eos) {
^…ed (#42558) Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: Pxl <pxl290@qq.com> Co-authored-by: Gabriel <gabrielleebuaa@gmail.com>
Proposed changes
Issue Number: close #xxx