Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions be/src/exec/operator/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ Status SortSinkLocalState::open(RuntimeState* state) {
}
switch (p._algorithm) {
case TSortAlgorithm::HEAP_SORT: {
_shared_state->sorter = HeapSorter::create_shared(
_ordering_expr_ctxs, state, p._limit, p._offset, p._pool, p._is_asc_order,
p._nulls_first, p._child->row_desc(),
state->get_query_ctx()->has_runtime_predicate(p._node_id));
_shared_state->sorter =
HeapSorter::create_shared(_ordering_expr_ctxs, state, p._limit, p._offset, p._pool,
p._is_asc_order, p._nulls_first, p._child->row_desc());
break;
}
case TSortAlgorithm::TOPN_SORT: {
Expand Down
7 changes: 3 additions & 4 deletions be/src/exec/sort/heap_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ namespace doris {
HeapSorter::HeapSorter(const VExprContextSPtrs& ordering_expr_ctxs, RuntimeState* state,
int64_t limit, int64_t offset, ObjectPool* pool,
std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
const RowDescriptor& row_desc, bool have_runtime_predicate)
const RowDescriptor& row_desc)
: Sorter(ordering_expr_ctxs, state, limit, offset, pool, is_asc_order, nulls_first),
_heap_size(limit + offset),
_state(MergeSorterState::create_unique(row_desc, offset)),
_have_runtime_predicate(have_runtime_predicate) {}
_state(MergeSorterState::create_unique(row_desc, offset)) {}

Status HeapSorter::append_block(Block* block) {
auto tmp_block = std::make_shared<Block>(block->clone_empty());
if (!_have_runtime_predicate && _queue.is_valid() && _queue_row_num >= _heap_size) {
if (_queue.is_valid() && _queue_row_num >= _heap_size) {
RETURN_IF_ERROR(_prepare_sort_columns(*block, *tmp_block, false));
tmp_block->swap(*block);
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(tmp_block, _sort_description);
Expand Down
4 changes: 1 addition & 3 deletions be/src/exec/sort/heap_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ class HeapSorter final : public Sorter {
public:
HeapSorter(const VExprContextSPtrs& ordering_expr_ctxs, RuntimeState* state, int64_t limit,
int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order,
std::vector<bool>& nulls_first, const RowDescriptor& row_desc,
bool have_runtime_predicate = true);
std::vector<bool>& nulls_first, const RowDescriptor& row_desc);

~HeapSorter() override = default;

Expand Down Expand Up @@ -56,7 +55,6 @@ class HeapSorter final : public Sorter {
MergeSorterQueue _queue;
std::unique_ptr<MergeSorterState> _state;
IColumn::Permutation _reverse_buffer;
bool _have_runtime_predicate = true;
RuntimeProfile::Counter* _topn_filter_timer = nullptr;
RuntimeProfile::Counter* _topn_filter_rows_counter = nullptr;
};
Expand Down
6 changes: 2 additions & 4 deletions be/test/exec/sort/heap_sorter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,7 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
EXPECT_EQ(sorter->_queue_row_num, 6);

auto value = sorter->get_top_value();
Field real;
block.get_by_position(0).column->get(0, real);
EXPECT_EQ(value, real);
EXPECT_EQ(value, Field::create_field<TYPE_BIGINT>(Int64(6)));
}

EXPECT_TRUE(sorter->prepare_for_read(false));
Expand All @@ -117,4 +115,4 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
}
}

} // namespace doris
} // namespace doris
Loading