Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/exec/common/partition_sort_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void PartitionBlocks::create_or_reset_sorter_state() {
if (_partition_topn_sorter == nullptr) {
_previous_row = std::make_unique<SortCursorCmp>();
_partition_topn_sorter = PartitionSorter::create_unique(
*_partition_sort_info->_vsort_exec_exprs, _partition_sort_info->_limit,
*_partition_sort_info->_ordering_expr_ctxs, _partition_sort_info->_limit,
_partition_sort_info->_offset, _partition_sort_info->_pool,
_partition_sort_info->_is_asc_order, _partition_sort_info->_nulls_first,
_partition_sort_info->_row_desc, _partition_sort_info->_runtime_state,
Expand Down
8 changes: 4 additions & 4 deletions be/src/exec/common/partition_sort_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@
#include "exec/common/hash_table/ph_hash_map.h"
#include "exec/common/hash_table/string_hash_map.h"
#include "exec/sort/partition_sorter.h"
#include "exec/sort/vsort_exec_exprs.h"
#include "exprs/vexpr_fwd.h"

namespace doris {

struct PartitionSortInfo {
~PartitionSortInfo() = default;

PartitionSortInfo(VSortExecExprs* vsort_exec_exprs, int64_t limit, int64_t offset,
PartitionSortInfo(const VExprContextSPtrs* ordering_expr_ctxs, int64_t limit, int64_t offset,
ObjectPool* pool, const std::vector<bool>& is_asc_order,
const std::vector<bool>& nulls_first, const RowDescriptor& row_desc,
RuntimeState* runtime_state, RuntimeProfile* runtime_profile,
bool has_global_limit, int64_t partition_inner_limit,
TopNAlgorithm::type top_n_algorithm, TPartTopNPhase::type topn_phase)
: _vsort_exec_exprs(vsort_exec_exprs),
: _ordering_expr_ctxs(ordering_expr_ctxs),
_limit(limit),
_offset(offset),
_pool(pool),
Expand All @@ -54,7 +54,7 @@ struct PartitionSortInfo {
_topn_phase(topn_phase) {}

public:
VSortExecExprs* _vsort_exec_exprs = nullptr;
const VExprContextSPtrs* _ordering_expr_ctxs = nullptr;
int64_t _limit = -1;
int64_t _offset = 0;
ObjectPool* _pool = nullptr;
Expand Down
24 changes: 11 additions & 13 deletions be/src/exec/operator/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "exec/exchange/vdata_stream_mgr.h"
#include "exec/exchange/vdata_stream_recvr.h"
#include "exec/operator/operator.h"
#include "exec/sort/vsort_exec_exprs.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -103,7 +103,10 @@ Status ExchangeLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ExchangeSourceOperatorX>();
if (p.is_merging()) {
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, vsort_exec_exprs));
ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size());
for (size_t i = 0; i < p._ordering_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, ordering_expr_ctxs[i]));
}
}
return Status::OK();
}
Expand All @@ -124,7 +127,8 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state
if (!_is_merging) {
return Status::OK();
}
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.exchange_node.sort_info, _pool));
RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.exchange_node.sort_info.ordering_exprs,
_ordering_expr_ctxs));
_is_asc_order = tnode.exchange_node.sort_info.is_asc_order;
_nulls_first = tnode.exchange_node.sort_info.nulls_first;

Expand All @@ -136,8 +140,8 @@ Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
DCHECK_GT(_num_senders, 0);

if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor));
RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state));
}
return Status::OK();
}
Expand All @@ -153,8 +157,8 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, boo
if (_is_merging && !local_state.is_ready) {
SCOPED_TIMER(local_state.create_merger_timer);
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
local_state.vsort_exec_exprs.ordering_expr_ctxs(), _is_asc_order, _nulls_first,
state->batch_size(), _limit, _offset));
local_state.ordering_expr_ctxs, _is_asc_order, _nulls_first, state->batch_size(),
_limit, _offset));
local_state.is_ready = true;
return Status::OK();
}
Expand Down Expand Up @@ -209,16 +213,10 @@ Status ExchangeLocalState::close(RuntimeState* state) {
if (stream_recvr != nullptr) {
stream_recvr->close();
}
if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
vsort_exec_exprs.close(state);
}
return Base::close(state);
}

Status ExchangeSourceOperatorX::close(RuntimeState* state) {
if (_is_merging && !is_closed()) {
_vsort_exec_exprs.close(state);
}
_is_closed = true;
return OperatorX<ExchangeLocalState>::close(state);
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/operator/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <stdint.h>

#include "exec/operator/operator.h"
#include "exprs/vexpr_fwd.h"

namespace doris {
class ExecNode;
Expand Down Expand Up @@ -71,7 +72,7 @@ class ExchangeLocalState : public PipelineXLocalState<> {

MOCK_FUNCTION void create_stream_recvr(RuntimeState* state);
std::shared_ptr<doris::VDataStreamRecvr> stream_recvr;
doris::VSortExecExprs vsort_exec_exprs;
doris::VExprContextSPtrs ordering_expr_ctxs;
int64_t num_rows_skipped;
bool is_ready;

Expand Down Expand Up @@ -128,7 +129,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {

// use in merge sort
size_t _offset;
doris::VSortExecExprs _vsort_exec_exprs;
doris::VExprContextSPtrs _ordering_expr_ctxs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
};
Expand Down
14 changes: 8 additions & 6 deletions be/src/exec/operator/local_merge_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <string>

#include "exec/operator/operator.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"

namespace doris {

Expand Down Expand Up @@ -60,10 +62,9 @@ std::vector<Dependency*> LocalMergeSortLocalState::dependencies() const {
Status LocalMergeSortLocalState::build_merger(RuntimeState* state) {
auto& p = _parent->cast<LocalMergeSortSourceOperatorX>();
VExprContextSPtrs ordering_expr_ctxs;
ordering_expr_ctxs.resize(p._vsort_exec_exprs.ordering_expr_ctxs().size());
ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size());
for (size_t i = 0; i < ordering_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(
p._vsort_exec_exprs.ordering_expr_ctxs()[i]->clone(state, ordering_expr_ctxs[i]));
RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, ordering_expr_ctxs[i]));
}
_merger = std::make_unique<VSortedRunMerger>(ordering_expr_ctxs, p._is_asc_order,
p._nulls_first, state->batch_size(), p._limit,
Expand All @@ -89,7 +90,8 @@ LocalMergeSortSourceOperatorX::LocalMergeSortSourceOperatorX(ObjectPool* pool,

Status LocalMergeSortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(Base::init(tnode, state));
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.sort_node.sort_info.ordering_exprs,
_ordering_expr_ctxs));
_is_asc_order = tnode.sort_node.sort_info.is_asc_order;
_nulls_first = tnode.sort_node.sort_info.nulls_first;
_op_name = "LOCAL_MERGE_SORT_SOURCE_OPERATOR";
Expand All @@ -98,8 +100,8 @@ Status LocalMergeSortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*

Status LocalMergeSortSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor));
RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state));
init_dependencies_and_sorter();
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/operator/local_merge_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/status.h"
#include "exec/operator/operator.h"
#include "exec/pipeline/dependency.h"
#include "exprs/vexpr_fwd.h"

namespace doris {
class RuntimeState;
Expand Down Expand Up @@ -101,7 +102,7 @@ class LocalMergeSortSourceOperatorX final : public OperatorX<LocalMergeSortLocal
const bool _merge_by_exchange;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
VSortExecExprs _vsort_exec_exprs;
VExprContextSPtrs _ordering_expr_ctxs;
const int64_t _offset;

std::vector<DependencySPtr> _other_source_deps;
Expand Down
16 changes: 11 additions & 5 deletions be/src/exec/operator/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "common/status.h"
#include "exec/common/hash_table/hash.h"
#include "exec/operator/partition_sort_source_operator.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"

namespace doris {

Expand All @@ -32,7 +34,10 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<PartitionSortSinkOperatorX>();
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size());
for (size_t i = 0; i < p._ordering_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, _ordering_expr_ctxs[i]));
}
_partition_expr_ctxs.resize(p._partition_expr_ctxs.size());
for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i]));
Expand All @@ -53,7 +58,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_sorted_partition_input_rows_counter =
ADD_COUNTER(custom_profile(), "SortedPartitionInputRows", TUnit::UNIT);
_partition_sort_info = std::make_shared<PartitionSortInfo>(
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
&_ordering_expr_ctxs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
p._child->row_desc(), state, custom_profile(), p._has_global_limit,
p._partition_inner_limit, p._top_n_algorithm, p._topn_phase);
custom_profile()->add_info_string("PartitionTopNPhase", to_string(p._topn_phase));
Expand Down Expand Up @@ -83,7 +88,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st

//order by key
if (tnode.partition_sort_node.__isset.sort_info) {
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.partition_sort_node.sort_info, _pool));
RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.partition_sort_node.sort_info.ordering_exprs,
_ordering_expr_ctxs));
_is_asc_order = tnode.partition_sort_node.sort_info.is_asc_order;
_nulls_first = tnode.partition_sort_node.sort_info.nulls_first;
}
Expand All @@ -98,9 +104,9 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st

Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::prepare(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor));
RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor));
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, _child->row_desc()));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state));
RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
return Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/operator/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "exec/common/partition_sort_utils.h"
#include "exec/operator/operator.h"
#include "exec/sort/partition_sorter.h"
#include "exprs/vexpr_fwd.h"

namespace doris {

Expand All @@ -41,7 +42,7 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort
friend class PartitionSortSinkOperatorX;

// Expressions and parameters used for build _sort_description
VSortExecExprs _vsort_exec_exprs;
VExprContextSPtrs _ordering_expr_ctxs;
VExprContextSPtrs _partition_expr_ctxs;
int64_t _sorted_partition_input_rows = 0;
std::vector<PartitionDataPtr> _value_places;
Expand Down Expand Up @@ -115,7 +116,7 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortS
VExprContextSPtrs _partition_expr_ctxs;
const std::vector<TExpr> _distribute_exprs;
// Expressions and parameters used for build _sort_description
VSortExecExprs _vsort_exec_exprs;
VExprContextSPtrs _ordering_expr_ctxs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;

Expand Down
20 changes: 13 additions & 7 deletions be/src/exec/operator/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "exec/operator/operator.h"
#include "exec/sort/heap_sorter.h"
#include "exec/sort/topn_sorter.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "runtime/query_context.h"

namespace doris {
Expand All @@ -43,23 +45,26 @@ Status SortSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<SortSinkOperatorX>();

RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size());
for (size_t i = 0; i < p._ordering_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, _ordering_expr_ctxs[i]));
}
switch (p._algorithm) {
case TSortAlgorithm::HEAP_SORT: {
_shared_state->sorter = HeapSorter::create_shared(
_vsort_exec_exprs, state, p._limit, p._offset, p._pool, p._is_asc_order,
_ordering_expr_ctxs, state, p._limit, p._offset, p._pool, p._is_asc_order,
p._nulls_first, p._child->row_desc(),
state->get_query_ctx()->has_runtime_predicate(p._node_id));
break;
}
case TSortAlgorithm::TOPN_SORT: {
_shared_state->sorter = TopNSorter::create_shared(
_vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
_ordering_expr_ctxs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first,
p._child->row_desc(), state, custom_profile());
break;
}
case TSortAlgorithm::FULL_SORT: {
auto sorter = FullSorter::create_shared(_vsort_exec_exprs, p._limit, p._offset, p._pool,
auto sorter = FullSorter::create_shared(_ordering_expr_ctxs, p._limit, p._offset, p._pool,
p._is_asc_order, p._nulls_first,
p._child->row_desc(), state, custom_profile());
if (p._max_buffered_bytes > 0) {
Expand Down Expand Up @@ -107,7 +112,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, int dest

Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.sort_node.sort_info.ordering_exprs,
_ordering_expr_ctxs));
_is_asc_order = tnode.sort_node.sort_info.is_asc_order;
_nulls_first = tnode.sort_node.sort_info.nulls_first;

Expand All @@ -121,8 +127,8 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {

Status SortSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::prepare(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor));
return _vsort_exec_exprs.open(state);
RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor));
return VExpr::open(_ordering_expr_ctxs, state);
}

Status SortSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block, bool eos) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/operator/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "core/field.h"
#include "exec/operator/operator.h"
#include "exprs/vexpr_fwd.h"

namespace doris {

Expand All @@ -42,7 +43,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> {
friend class SortSinkOperatorX;

// Expressions and parameters used for build _sort_description
VSortExecExprs _vsort_exec_exprs;
VExprContextSPtrs _ordering_expr_ctxs;

RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr;

Expand Down Expand Up @@ -112,7 +113,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
ObjectPool* _pool = nullptr;

// Expressions and parameters used for build _sort_description
VSortExecExprs _vsort_exec_exprs;
VExprContextSPtrs _ordering_expr_ctxs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;

Expand Down
10 changes: 6 additions & 4 deletions be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "exec/spill/spill_file_manager.h"
#include "exec/spill/spill_file_reader.h"
#include "exec/spill/spill_file_writer.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"

Expand All @@ -34,13 +36,13 @@ Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile,
_row_desc = row_desc;

// Initialize sort expressions from sort_info (contains ordering columns, asc/desc, nulls first/last)
RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_ERROR(VExpr::create_expr_trees(_sort_info.ordering_exprs, _ordering_expr_ctxs));
RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, *row_desc));
RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state));

// Create FullSorter for in-memory sorting with spill support enabled.
// Parameters: limit=-1 (no limit), offset=0 (no offset)
_sorter = FullSorter::create_unique(_vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order,
_sorter = FullSorter::create_unique(_ordering_expr_ctxs, -1, 0, &_pool, _sort_info.is_asc_order,
_sort_info.nulls_first, *row_desc, state, _profile);
_sorter->init_profile(_profile);
// Enable spill support so the sorter can be used with the spill framework
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class VIcebergSortWriter : public IPartitionWriterBase {
const RowDescriptor* _row_desc = nullptr;
ObjectPool _pool;
TSortInfo _sort_info;
VSortExecExprs _vsort_exec_exprs;
VExprContextSPtrs _ordering_expr_ctxs;
// The underlying partition writer that handles actual Parquet/ORC file I/O
std::shared_ptr<VIcebergPartitionWriter> _iceberg_partition_writer;
// Lambda for creating new writers when file splitting occurs
Expand Down
Loading
Loading