Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/vec/aggregate_functions/aggregate_function_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct AggregateFunctionSortData {
}
}

void sort() { sort_block(block, sort_desc, block.rows()); }
void sort() { sort_block(block, block, sort_desc, block.rows()); }
};

template <typename Data>
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/common/sort/heap_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs
_topn_filter_rows(0),
_init_sort_descs(false) {}

Status HeapSorter::append_block(Block* block, bool* mem_reuse) {
Status HeapSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
{
SCOPED_TIMER(_materialize_timer);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/common/sort/heap_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class HeapSorter final : public Sorter {
_materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime");
}

Status append_block(Block* block, bool* mem_reuse) override;
Status append_block(Block* block) override;

Status prepare_for_read() override;

Expand Down
50 changes: 33 additions & 17 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,27 @@ Status MergeSorterState::merge_sort_read(doris::RuntimeState* state,
return Status::OK();
}

Status Sorter::partial_sort(Block& block) {
if (_vsort_exec_exprs.need_materialize_tuple()) {
Status Sorter::partial_sort(Block& src_block, Block& dest_block) {
size_t num_cols = src_block.columns();
if (_materialize_sort_exprs) {
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&block, &valid_column_ids[i]));
RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, &valid_column_ids[i]));
}

Block new_block;
for (auto column_id : valid_column_ids) {
new_block.insert(block.get_by_position(column_id));
new_block.insert(src_block.get_by_position(column_id));
}
block.swap(new_block);
dest_block.swap(new_block);
}

_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block;
for (int i = 0; i < _sort_description.size(); i++) {
const auto& ordering_expr = _vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
RETURN_IF_ERROR(ordering_expr->execute(&block, &_sort_description[i].column_number));
RETURN_IF_ERROR(ordering_expr->execute(result_block, &_sort_description[i].column_number));

_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
_sort_description[i].nulls_direction =
Expand All @@ -99,7 +101,12 @@ Status Sorter::partial_sort(Block& block) {

{
SCOPED_TIMER(_partial_sort_timer);
sort_block(block, _sort_description, _offset + _limit);
if (_materialize_sort_exprs) {
sort_block(dest_block, dest_block, _sort_description, _offset + _limit);
} else {
sort_block(src_block, dest_block, _sort_description, _offset + _limit);
}
src_block.clear_column_data(num_cols);
}

return Status::OK();
Expand All @@ -111,11 +118,19 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first),
_state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))) {}

Status FullSorter::append_block(Block* block, bool* mem_reuse) {
Status FullSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
{
SCOPED_TIMER(_merge_block_timer);
_state->unsorted_block->merge(*block);
auto& data = _state->unsorted_block->get_columns_with_type_and_name();
const auto& arrival_data = block->get_columns_with_type_and_name();
auto sz = block->rows();
for (int i = 0; i < data.size(); ++i) {
DCHECK(data[i].type->equals(*(arrival_data[i].type)));
data[i].column->assume_mutable()->insert_range_from(
*arrival_data[i].column->convert_to_full_column_if_const().get(), 0, sz);
}
block->clear_column_data();
}
if (_reach_limit()) {
RETURN_IF_ERROR(_do_sort());
Expand Down Expand Up @@ -147,32 +162,33 @@ Status FullSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
}

Status FullSorter::_do_sort() {
Block block = _state->unsorted_block->to_block(0);
RETURN_IF_ERROR(partial_sort(block));
Block* src_block = _state->unsorted_block.get();
Block desc_block = src_block->clone_without_columns();
RETURN_IF_ERROR(partial_sort(*src_block, desc_block));

// dispose TOP-N logic
if (_limit != -1) {
// Here is a little opt to reduce the mem uasge, we build a max heap
// to order the block in _block_priority_queue.
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows < _limit) {
_state->sorted_blocks.emplace_back(std::move(block));
_state->num_rows += block.rows();
_state->num_rows += desc_block.rows();
_state->sorted_blocks.emplace_back(std::move(desc_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->sorted_blocks.back(), _sort_description)));
} else {
MergeSortBlockCursor block_cursor(
_pool->add(new MergeSortCursorImpl(block, _sort_description)));
_pool->add(new MergeSortCursorImpl(desc_block, _sort_description)));
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->sorted_blocks.emplace_back(std::move(block));
_state->sorted_blocks.emplace_back(std::move(desc_block));
_block_priority_queue.push(block_cursor);
}
}
} else {
// dispose normal sort logic
_state->sorted_blocks.emplace_back(std::move(block));
_state->sorted_blocks.emplace_back(std::move(desc_block));
}
_state->reset_block();
return Status::OK();
}

Expand Down
24 changes: 9 additions & 15 deletions be/src/vec/common/sort/sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,23 @@ namespace doris::vectorized {
class MergeSorterState {
public:
MergeSorterState(const RowDescriptor& row_desc, int64_t offset)
: unsorted_block(new MutableBlock(
VectorizedUtils::create_empty_columnswithtypename(row_desc))),
_offset(offset),
_row_desc(row_desc) {}
: unsorted_block(new Block(VectorizedUtils::create_empty_block(row_desc))),
_offset(offset) {}

~MergeSorterState() = default;

void reset_block() {
unsorted_block.reset(
new MutableBlock(VectorizedUtils::create_empty_columnswithtypename(_row_desc)));
}

void build_merge_tree(SortDescription& sort_description);

Status merge_sort_read(doris::RuntimeState* state, doris::vectorized::Block* block, bool* eos);

std::priority_queue<MergeSortCursor> priority_queue;
std::vector<MergeSortCursorImpl> cursors;
std::unique_ptr<MutableBlock> unsorted_block;
std::unique_ptr<Block> unsorted_block;
std::vector<Block> sorted_blocks;
uint64_t num_rows = 0;

private:
int64_t _offset;
const RowDescriptor& _row_desc;
};

class Sorter {
Expand All @@ -67,7 +59,8 @@ class Sorter {
_offset(offset),
_pool(pool),
_is_asc_order(is_asc_order),
_nulls_first(nulls_first) {}
_nulls_first(nulls_first),
_materialize_sort_exprs(vsort_exec_exprs.need_materialize_tuple()) {}

virtual ~Sorter() = default;

Expand All @@ -76,14 +69,14 @@ class Sorter {
_merge_block_timer = ADD_TIMER(runtime_profile, "MergeBlockTime");
};

virtual Status append_block(Block* block, bool* mem_reuse) = 0;
virtual Status append_block(Block* block) = 0;

virtual Status prepare_for_read() = 0;

virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0;

protected:
Status partial_sort(Block& block);
Status partial_sort(Block& src_block, Block& dest_block);

SortDescription _sort_description;
VSortExecExprs& _vsort_exec_exprs;
Expand All @@ -97,6 +90,7 @@ class Sorter {
RuntimeProfile::Counter* _merge_block_timer = nullptr;

std::priority_queue<MergeSortBlockCursor> _block_priority_queue;
bool _materialize_sort_exprs;
};

class FullSorter final : public Sorter {
Expand All @@ -107,7 +101,7 @@ class FullSorter final : public Sorter {

~FullSorter() override = default;

Status append_block(Block* block, bool* mem_reuse) override;
Status append_block(Block* block) override;

Status prepare_for_read() override;

Expand Down
28 changes: 10 additions & 18 deletions be/src/vec/common/sort/topn_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs
ObjectPool* pool, std::vector<bool>& is_asc_order,
std::vector<bool>& nulls_first, const RowDescriptor& row_desc)
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first),
_state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))) {}
_state(std::unique_ptr<MergeSorterState>(new MergeSorterState(row_desc, offset))),
_row_desc(row_desc) {}

Status TopNSorter::append_block(Block* block, bool* mem_reuse) {
Status TopNSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
RETURN_IF_ERROR(_do_sort(block, mem_reuse));
RETURN_IF_ERROR(_do_sort(block));
return Status::OK();
}

Expand All @@ -51,40 +52,31 @@ Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
return Status::OK();
}

Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) {
*mem_reuse = false;
RETURN_IF_ERROR(partial_sort(*block));
Status TopNSorter::_do_sort(Block* block) {
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
RETURN_IF_ERROR(partial_sort(*block, sorted_block));

// dispose TOP-N logic
if (_limit != -1) {
// Here is a little opt to reduce the mem uasge, we build a max heap
// to order the block in _block_priority_queue.
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows < _limit) {
Block sorted_block;
sorted_block.swap(*block);
_state->sorted_blocks.emplace_back(std::move(sorted_block));
_state->num_rows += sorted_block.rows();
_state->sorted_blocks.emplace_back(std::move(sorted_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->sorted_blocks.back(), _sort_description)));
} else {
Block sorted_block;
sorted_block.swap(*block);
MergeSortBlockCursor block_cursor(
_pool->add(new MergeSortCursorImpl(sorted_block, _sort_description)));
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->sorted_blocks.emplace_back(std::move(sorted_block));
_block_priority_queue.push(block_cursor);
} else {
*mem_reuse = true;
block->clear_column_data();
}
}
} else {
Block sorted_block;
sorted_block.swap(*block);
// dispose normal sort logic
_state->sorted_blocks.emplace_back(std::move(sorted_block));
return Status::InternalError("Should not reach TopN sorter for full sort query");
}
return Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/common/sort/topn_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TopNSorter final : public Sorter {

~TopNSorter() override = default;

Status append_block(Block* block, bool* mem_reuse) override;
Status append_block(Block* block) override;

Status prepare_for_read() override;

Expand All @@ -39,9 +39,10 @@ class TopNSorter final : public Sorter {
static constexpr size_t TOPN_SORT_THRESHOLD = 256;

private:
Status _do_sort(Block* block, bool* mem_reuse);
Status _do_sort(Block* block);

std::unique_ptr<MergeSorterState> _state;
const RowDescriptor& _row_desc;
};

} // namespace doris::vectorized
24 changes: 14 additions & 10 deletions be/src/vec/core/sort_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ struct PartialSortingLess {
}
};

void sort_block(Block& block, const SortDescription& description, UInt64 limit) {
if (!block) {
void sort_block(Block& src_block, Block& dest_block, const SortDescription& description,
UInt64 limit) {
if (!src_block) {
return;
}

Expand All @@ -74,18 +75,19 @@ void sort_block(Block& block, const SortDescription& description, UInt64 limit)

const IColumn* column =
!description[0].column_name.empty()
? block.get_by_name(description[0].column_name).column.get()
: block.safe_get_by_position(description[0].column_number).column.get();
? src_block.get_by_name(description[0].column_name).column.get()
: src_block.safe_get_by_position(description[0].column_number).column.get();

IColumn::Permutation perm;
column->get_permutation(reverse, limit, description[0].nulls_direction, perm);

size_t columns = block.columns();
size_t columns = src_block.columns();
for (size_t i = 0; i < columns; ++i) {
block.get_by_position(i).column = block.get_by_position(i).column->permute(perm, limit);
dest_block.replace_by_position(
i, src_block.get_by_position(i).column->permute(perm, limit));
}
} else {
size_t size = block.rows();
size_t size = src_block.rows();
IColumn::Permutation perm(size);
for (size_t i = 0; i < size; ++i) {
perm[i] = i;
Expand All @@ -96,20 +98,22 @@ void sort_block(Block& block, const SortDescription& description, UInt64 limit)
}

ColumnsWithSortDescriptions columns_with_sort_desc =
get_columns_with_sort_description(block, description);
get_columns_with_sort_description(src_block, description);
{
EqualFlags flags(size, 1);
EqualRange range {0, size};

// TODO: ColumnSorter should be constructed only once.
for (size_t i = 0; i < columns_with_sort_desc.size(); i++) {
ColumnSorter sorter(columns_with_sort_desc[i], limit);
sorter.operator()(flags, perm, range, i == columns_with_sort_desc.size() - 1);
}
}

size_t columns = block.columns();
size_t columns = src_block.columns();
for (size_t i = 0; i < columns; ++i) {
block.get_by_position(i).column = block.get_by_position(i).column->permute(perm, limit);
dest_block.replace_by_position(
i, src_block.get_by_position(i).column->permute(perm, limit));
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/core/sort_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
namespace doris::vectorized {

/// Sort one block by `description`. If limit != 0, then the partial sort of the first `limit` rows is produced.
void sort_block(Block& block, const SortDescription& description, UInt64 limit = 0);
void sort_block(Block& src_block, Block& dest_block, const SortDescription& description,
UInt64 limit = 0);

/** Used only in StorageMergeTree to sort the data with INSERT.
* Sorting is stable. This is important for keeping the order of rows in the CollapsingMergeTree engine
Expand Down
Loading