From 6ca231252537eff3dda887e04c499102979fbc77 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 5 Sep 2022 13:04:02 +0800 Subject: [PATCH 1/6] [Improvement](sort) Accumulate blocks to do partial sort --- be/src/vec/exec/vsort_node.cpp | 83 ++++++++++++++++++---------------- be/src/vec/exec/vsort_node.h | 7 ++- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 8749b2913ea613..1b534af8a72c3c 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -22,13 +22,15 @@ #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "vec/core/sort_block.h" +#include "vec/utils/util.hpp" namespace doris::vectorized { VSortNode::VSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), - _num_rows_skipped(0) {} + _num_rows_skipped(0), + _unsorted_block(nullptr) {} Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -44,6 +46,8 @@ Status VSortNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); + _unsorted_block.reset(new MutableBlock( + VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc()))); return Status::OK(); } @@ -124,54 +128,57 @@ void VSortNode::debug_string(int indentation_level, stringstream* out) const { Status VSortNode::sort_input(RuntimeState* state) { bool eos = false; do { - Block block; - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, &eos), - child(0)->get_next_span(), eos); - auto rows = block.rows(); - - if (rows != 0) { - RETURN_IF_ERROR(pretreat_block(block)); - size_t mem_usage = block.allocated_bytes(); - - // dispose TOP-N logic - if (_limit != -1) { - // Here is a little opt to reduce the mem uasge, we build a max heap - // to order the block in _block_priority_queue. - // if one block totally greater the heap top of _block_priority_queue - // we can throw the block data directly. - if (_num_rows_in_block < _limit) { - _total_mem_usage += mem_usage; + do { + Block upstream_block; + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(0)->get_next_after_projects(state, &upstream_block, &eos), + child(0)->get_next_span(), eos); + if (upstream_block.rows() != 0) { + size_t mem_usage = upstream_block.allocated_bytes(); + _total_mem_usage += mem_usage; + _unsorted_block->merge(upstream_block); + } + } while (!eos && _unsorted_block->rows() < BufferedBlockSize && + _unsorted_block->allocated_bytes() < BufferedBlockBytes); + Block block = _unsorted_block->to_block(0); + RETURN_IF_ERROR(partial_sort(block)); + + // dispose TOP-N logic + if (_limit != -1) { + // Here is a little opt to reduce the mem uasge, we build a max heap + // to order the block in _block_priority_queue. + // if one block totally greater the heap top of _block_priority_queue + // we can throw the block data directly. + if (_num_rows_in_block < _limit) { + _sorted_blocks.emplace_back(std::move(block)); + _num_rows_in_block += block.rows(); + _block_priority_queue.emplace( + _pool->add(new SortCursorImpl(_sorted_blocks.back(), _sort_description))); + } else { + SortBlockCursor block_cursor( + _pool->add(new SortCursorImpl(block, _sort_description))); + if (!block_cursor.totally_greater(_block_priority_queue.top())) { _sorted_blocks.emplace_back(std::move(block)); - _num_rows_in_block += rows; - _block_priority_queue.emplace(_pool->add( - new SortCursorImpl(_sorted_blocks.back(), _sort_description))); + _block_priority_queue.push(block_cursor); } else { - SortBlockCursor block_cursor( - _pool->add(new SortCursorImpl(block, _sort_description))); - if (!block_cursor.totally_greater(_block_priority_queue.top())) { - _sorted_blocks.emplace_back(std::move(block)); - _block_priority_queue.push(block_cursor); - _total_mem_usage += mem_usage; - } else { - continue; - } + continue; } - } else { - // dispose normal sort logic - _total_mem_usage += mem_usage; - _sorted_blocks.emplace_back(std::move(block)); } - - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); + } else { + // dispose normal sort logic + _sorted_blocks.emplace_back(std::move(block)); } + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); + _unsorted_block.reset(new MutableBlock( + VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc()))); } while (!eos); build_merge_tree(); return Status::OK(); } -Status VSortNode::pretreat_block(doris::vectorized::Block& block) { +Status VSortNode::partial_sort(doris::vectorized::Block& block) { if (_vsort_exec_exprs.need_materialize_tuple()) { auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); std::vector valid_column_ids(output_tuple_expr_ctxs.size()); diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index f67326afa67641..e6a4d2700d49c4 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -57,7 +57,7 @@ class VSortNode : public doris::ExecNode { // Fetch input rows and feed them to the sorter until the input is exhausted. Status sort_input(RuntimeState* state); - Status pretreat_block(Block& block); + Status partial_sort(Block& block); void build_merge_tree(); @@ -84,6 +84,11 @@ class VSortNode : public doris::ExecNode { // only valid in TOP-N node uint64_t _num_rows_in_block = 0; std::priority_queue _block_priority_queue; + + std::unique_ptr _unsorted_block; + + static constexpr size_t BufferedBlockSize = 1024 * 1024; + static constexpr size_t BufferedBlockBytes = 16 << 20; }; } // namespace doris::vectorized From 9ae8c25c7b2da7319bca39de54f04d9e54f69f58 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 5 Sep 2022 14:56:36 +0800 Subject: [PATCH 2/6] update --- be/src/vec/exec/vsort_node.cpp | 50 ++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 1b534af8a72c3c..73048f7e1e2091 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -143,35 +143,37 @@ Status VSortNode::sort_input(RuntimeState* state) { Block block = _unsorted_block->to_block(0); RETURN_IF_ERROR(partial_sort(block)); - // dispose TOP-N logic - if (_limit != -1) { - // Here is a little opt to reduce the mem uasge, we build a max heap - // to order the block in _block_priority_queue. - // if one block totally greater the heap top of _block_priority_queue - // we can throw the block data directly. - if (_num_rows_in_block < _limit) { - _sorted_blocks.emplace_back(std::move(block)); - _num_rows_in_block += block.rows(); - _block_priority_queue.emplace( - _pool->add(new SortCursorImpl(_sorted_blocks.back(), _sort_description))); - } else { - SortBlockCursor block_cursor( - _pool->add(new SortCursorImpl(block, _sort_description))); - if (!block_cursor.totally_greater(_block_priority_queue.top())) { + if (block.rows() > 0) { + // dispose TOP-N logic + if (_limit != -1) { + // Here is a little opt to reduce the mem uasge, we build a max heap + // to order the block in _block_priority_queue. + // if one block totally greater the heap top of _block_priority_queue + // we can throw the block data directly. + if (_num_rows_in_block < _limit) { _sorted_blocks.emplace_back(std::move(block)); - _block_priority_queue.push(block_cursor); + _num_rows_in_block += block.rows(); + _block_priority_queue.emplace(_pool->add( + new SortCursorImpl(_sorted_blocks.back(), _sort_description))); } else { - continue; + SortBlockCursor block_cursor( + _pool->add(new SortCursorImpl(block, _sort_description))); + if (!block_cursor.totally_greater(_block_priority_queue.top())) { + _sorted_blocks.emplace_back(std::move(block)); + _block_priority_queue.push(block_cursor); + } else { + continue; + } } + } else { + // dispose normal sort logic + _sorted_blocks.emplace_back(std::move(block)); } - } else { - // dispose normal sort logic - _sorted_blocks.emplace_back(std::move(block)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); + _unsorted_block.reset(new MutableBlock( + VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc()))); } - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); - _unsorted_block.reset(new MutableBlock( - VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc()))); } while (!eos); build_merge_tree(); From 5440bd5e585fe3ba4df940beba70f67ae4a1d1fe Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 6 Sep 2022 11:48:20 +0800 Subject: [PATCH 3/6] update --- .../window_functions/test_window_function.out | 30 ++++--------------- 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out b/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out index ae4252ae19ff6f..b5d0c2bc8975a1 100644 --- a/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out +++ b/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out @@ -306,14 +306,14 @@ USA Pete Hello -- !last_value1 -- \N \N -9223372036854775807 false --9223372036854775807 false +-9223372036854775807 true -11011907 false -11011903 true 123456 true 7210457 false 11011902 false -11011902 false -11011902 false +11011902 true +11011902 true 11011903 false 11011905 false 11011920 true @@ -324,14 +324,14 @@ USA Pete Hello -- !last_value2 -- \N \N -9223372036854775807 false --9223372036854775807 false +-9223372036854775807 true -11011907 false -11011903 true 123456 true 7210457 false 11011902 false -11011902 false -11011902 false +11011902 true +11011902 true 11011903 false 11011905 false 11011920 true @@ -534,24 +534,6 @@ USA Pete Hello 9223372036854775807 1 9223372036854775807 2 --- !hujie1 -- -\N \N -1 true -2 false -3 false -4 false -5 true -6 true -7 false -8 true -9 true -10 false -11 true -12 false -13 false -14 false -15 true - -- !hujie2 -- \N \N 1 true From 9cf4341bf6aaa9c74fb91943aee1c8727407fa9d Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 6 Sep 2022 14:13:15 +0800 Subject: [PATCH 4/6] update --- .../window_functions/test_window_function.out | 18 ++++++++++++++++++ .../test_window_function.groovy | 8 ++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out b/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out index b5d0c2bc8975a1..792acddd3402fd 100644 --- a/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out +++ b/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out @@ -534,6 +534,24 @@ USA Pete Hello 9223372036854775807 1 9223372036854775807 2 +-- !hujie1 -- +\N \N +1 true +2 false +3 false +4 false +5 true +6 true +7 false +8 true +9 true +10 false +11 true +12 false +13 false +14 false +15 true + -- !hujie2 -- \N \N 1 true diff --git a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy index ed0ae196d48d6f..fc057e7a2e4295 100644 --- a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy @@ -336,12 +336,12 @@ suite("test_window_function") { order by a, wjj""" // test_query_last_value - qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3}) - as wj from baseall order by ${k1}, wj""" - qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3} + qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2}) + as wj from baseall order by ${k1}, wj""" + qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2} range between unbounded preceding and current row) as wj from baseall order by ${k1}, wj""" - qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3} + qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2} rows between unbounded preceding and current row) as wj from baseall order by ${k1}, wj""" qt_last_value4"""select a, max(d) as wjj from From 3457d945cebcc23f16bfa39f5b89468bf2586d6e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 6 Sep 2022 14:26:41 +0800 Subject: [PATCH 5/6] update --- be/src/vec/exec/vsort_node.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 73048f7e1e2091..b9f0e03af108af 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -140,10 +140,9 @@ Status VSortNode::sort_input(RuntimeState* state) { } } while (!eos && _unsorted_block->rows() < BufferedBlockSize && _unsorted_block->allocated_bytes() < BufferedBlockBytes); - Block block = _unsorted_block->to_block(0); - RETURN_IF_ERROR(partial_sort(block)); - - if (block.rows() > 0) { + if (_unsorted_block->rows() > 0) { + Block block = _unsorted_block->to_block(0); + RETURN_IF_ERROR(partial_sort(block)); // dispose TOP-N logic if (_limit != -1) { // Here is a little opt to reduce the mem uasge, we build a max heap From 2786c526fbfdf209dd4666ee68ed8a66b82a48de Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 6 Sep 2022 16:35:58 +0800 Subject: [PATCH 6/6] update --- be/src/vec/exec/vsort_node.cpp | 7 +++---- be/src/vec/exec/vsort_node.h | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index b9f0e03af108af..f12a436d1d3734 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -134,13 +134,12 @@ Status VSortNode::sort_input(RuntimeState* state) { child(0)->get_next_after_projects(state, &upstream_block, &eos), child(0)->get_next_span(), eos); if (upstream_block.rows() != 0) { - size_t mem_usage = upstream_block.allocated_bytes(); - _total_mem_usage += mem_usage; _unsorted_block->merge(upstream_block); } - } while (!eos && _unsorted_block->rows() < BufferedBlockSize && - _unsorted_block->allocated_bytes() < BufferedBlockBytes); + } while (!eos && _unsorted_block->rows() < BUFFERED_BLOCK_SIZE && + _unsorted_block->allocated_bytes() < BUFFERED_BLOCK_BYTES); if (_unsorted_block->rows() > 0) { + _total_mem_usage += _unsorted_block->allocated_bytes(); Block block = _unsorted_block->to_block(0); RETURN_IF_ERROR(partial_sort(block)); // dispose TOP-N logic diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index e6a4d2700d49c4..1565be5ca6d431 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -87,8 +87,8 @@ class VSortNode : public doris::ExecNode { std::unique_ptr _unsorted_block; - static constexpr size_t BufferedBlockSize = 1024 * 1024; - static constexpr size_t BufferedBlockBytes = 16 << 20; + static constexpr size_t BUFFERED_BLOCK_SIZE = 1024 * 1024; + static constexpr size_t BUFFERED_BLOCK_BYTES = 16 << 20; }; } // namespace doris::vectorized