Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions be/src/vec/exec/vsort_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "vec/core/sort_block.h"
#include "vec/utils/util.hpp"

namespace doris::vectorized {

VSortNode::VSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
_num_rows_skipped(0) {}
_num_rows_skipped(0),
_unsorted_block(nullptr) {}

Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
Expand All @@ -44,6 +46,8 @@ Status VSortNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor));
_unsorted_block.reset(new MutableBlock(
VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc())));
return Status::OK();
}

Expand Down Expand Up @@ -124,25 +128,29 @@ void VSortNode::debug_string(int indentation_level, stringstream* out) const {
Status VSortNode::sort_input(RuntimeState* state) {
bool eos = false;
do {
Block block;
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, &eos),
child(0)->get_next_span(), eos);
auto rows = block.rows();

if (rows != 0) {
RETURN_IF_ERROR(pretreat_block(block));
size_t mem_usage = block.allocated_bytes();

do {
Block upstream_block;
RETURN_IF_ERROR_AND_CHECK_SPAN(
child(0)->get_next_after_projects(state, &upstream_block, &eos),
child(0)->get_next_span(), eos);
if (upstream_block.rows() != 0) {
_unsorted_block->merge(upstream_block);
}
} while (!eos && _unsorted_block->rows() < BUFFERED_BLOCK_SIZE &&
_unsorted_block->allocated_bytes() < BUFFERED_BLOCK_BYTES);
if (_unsorted_block->rows() > 0) {
_total_mem_usage += _unsorted_block->allocated_bytes();
Block block = _unsorted_block->to_block(0);
RETURN_IF_ERROR(partial_sort(block));
// dispose TOP-N logic
if (_limit != -1) {
// Here is a little opt to reduce the mem uasge, we build a max heap
// to order the block in _block_priority_queue.
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_num_rows_in_block < _limit) {
_total_mem_usage += mem_usage;
_sorted_blocks.emplace_back(std::move(block));
_num_rows_in_block += rows;
_num_rows_in_block += block.rows();
_block_priority_queue.emplace(_pool->add(
new SortCursorImpl(_sorted_blocks.back(), _sort_description)));
} else {
Expand All @@ -151,27 +159,26 @@ Status VSortNode::sort_input(RuntimeState* state) {
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_sorted_blocks.emplace_back(std::move(block));
_block_priority_queue.push(block_cursor);
_total_mem_usage += mem_usage;
} else {
continue;
}
}
} else {
// dispose normal sort logic
_total_mem_usage += mem_usage;
_sorted_blocks.emplace_back(std::move(block));
}

RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input."));
_unsorted_block.reset(new MutableBlock(
VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc())));
}
} while (!eos);

build_merge_tree();
return Status::OK();
}

Status VSortNode::pretreat_block(doris::vectorized::Block& block) {
Status VSortNode::partial_sort(doris::vectorized::Block& block) {
if (_vsort_exec_exprs.need_materialize_tuple()) {
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/vsort_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class VSortNode : public doris::ExecNode {
// Fetch input rows and feed them to the sorter until the input is exhausted.
Status sort_input(RuntimeState* state);

Status pretreat_block(Block& block);
Status partial_sort(Block& block);

void build_merge_tree();

Expand All @@ -84,6 +84,11 @@ class VSortNode : public doris::ExecNode {
// only valid in TOP-N node
uint64_t _num_rows_in_block = 0;
std::priority_queue<SortBlockCursor> _block_priority_queue;

std::unique_ptr<MutableBlock> _unsorted_block;

static constexpr size_t BUFFERED_BLOCK_SIZE = 1024 * 1024;
static constexpr size_t BUFFERED_BLOCK_BYTES = 16 << 20;
};

} // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -306,14 +306,14 @@ USA Pete Hello
-- !last_value1 --
\N \N
-9223372036854775807 false
-9223372036854775807 false
-9223372036854775807 true
-11011907 false
-11011903 true
123456 true
7210457 false
11011902 false
11011902 false
11011902 false
11011902 true
11011902 true
11011903 false
11011905 false
11011920 true
Expand All @@ -324,14 +324,14 @@ USA Pete Hello
-- !last_value2 --
\N \N
-9223372036854775807 false
-9223372036854775807 false
-9223372036854775807 true
-11011907 false
-11011903 true
123456 true
7210457 false
11011902 false
11011902 false
11011902 false
11011902 true
11011902 true
11011903 false
11011905 false
11011920 true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,12 @@ suite("test_window_function") {
order by a, wjj"""

// test_query_last_value
qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3})
as wj from baseall order by ${k1}, wj"""
qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3}
qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2})
as wj from baseall order by ${k1}, wj"""
qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2}
range between unbounded preceding and current row)
as wj from baseall order by ${k1}, wj"""
qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3}
qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2}
rows between unbounded preceding and current row)
as wj from baseall order by ${k1}, wj"""
qt_last_value4"""select a, max(d) as wjj from
Expand Down