diff --git a/be/src/exec/common/partition_sort_utils.cpp b/be/src/exec/common/partition_sort_utils.cpp index ba7c5fa5808df0..09f834532d5940 100644 --- a/be/src/exec/common/partition_sort_utils.cpp +++ b/be/src/exec/common/partition_sort_utils.cpp @@ -54,7 +54,7 @@ void PartitionBlocks::create_or_reset_sorter_state() { if (_partition_topn_sorter == nullptr) { _previous_row = std::make_unique(); _partition_topn_sorter = PartitionSorter::create_unique( - *_partition_sort_info->_vsort_exec_exprs, _partition_sort_info->_limit, + *_partition_sort_info->_ordering_expr_ctxs, _partition_sort_info->_limit, _partition_sort_info->_offset, _partition_sort_info->_pool, _partition_sort_info->_is_asc_order, _partition_sort_info->_nulls_first, _partition_sort_info->_row_desc, _partition_sort_info->_runtime_state, diff --git a/be/src/exec/common/partition_sort_utils.h b/be/src/exec/common/partition_sort_utils.h index f9edbb39e448c0..b4cfb0809fda7e 100644 --- a/be/src/exec/common/partition_sort_utils.h +++ b/be/src/exec/common/partition_sort_utils.h @@ -26,20 +26,20 @@ #include "exec/common/hash_table/ph_hash_map.h" #include "exec/common/hash_table/string_hash_map.h" #include "exec/sort/partition_sorter.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr_fwd.h" namespace doris { struct PartitionSortInfo { ~PartitionSortInfo() = default; - PartitionSortInfo(VSortExecExprs* vsort_exec_exprs, int64_t limit, int64_t offset, + PartitionSortInfo(const VExprContextSPtrs* ordering_expr_ctxs, int64_t limit, int64_t offset, ObjectPool* pool, const std::vector& is_asc_order, const std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* runtime_state, RuntimeProfile* runtime_profile, bool has_global_limit, int64_t partition_inner_limit, TopNAlgorithm::type top_n_algorithm, TPartTopNPhase::type topn_phase) - : _vsort_exec_exprs(vsort_exec_exprs), + : _ordering_expr_ctxs(ordering_expr_ctxs), _limit(limit), _offset(offset), _pool(pool), @@ -54,7 +54,7 @@ struct PartitionSortInfo { _topn_phase(topn_phase) {} public: - VSortExecExprs* _vsort_exec_exprs = nullptr; + const VExprContextSPtrs* _ordering_expr_ctxs = nullptr; int64_t _limit = -1; int64_t _offset = 0; ObjectPool* _pool = nullptr; diff --git a/be/src/exec/operator/exchange_source_operator.cpp b/be/src/exec/operator/exchange_source_operator.cpp index 462c9e5860a5e3..e008d599078d77 100644 --- a/be/src/exec/operator/exchange_source_operator.cpp +++ b/be/src/exec/operator/exchange_source_operator.cpp @@ -25,7 +25,7 @@ #include "exec/exchange/vdata_stream_mgr.h" #include "exec/exchange/vdata_stream_recvr.h" #include "exec/operator/operator.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr.h" #include "exprs/vexpr_context.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -103,7 +103,10 @@ Status ExchangeLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); if (p.is_merging()) { - RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, vsort_exec_exprs)); + ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size()); + for (size_t i = 0; i < p._ordering_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, ordering_expr_ctxs[i])); + } } return Status::OK(); } @@ -124,7 +127,8 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state if (!_is_merging) { return Status::OK(); } - RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.exchange_node.sort_info, _pool)); + RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.exchange_node.sort_info.ordering_exprs, + _ordering_expr_ctxs)); _is_asc_order = tnode.exchange_node.sort_info.is_asc_order; _nulls_first = tnode.exchange_node.sort_info.nulls_first; @@ -136,8 +140,8 @@ Status ExchangeSourceOperatorX::prepare(RuntimeState* state) { DCHECK_GT(_num_senders, 0); if (_is_merging) { - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor)); - RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor)); + RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state)); } return Status::OK(); } @@ -153,8 +157,8 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, boo if (_is_merging && !local_state.is_ready) { SCOPED_TIMER(local_state.create_merger_timer); RETURN_IF_ERROR(local_state.stream_recvr->create_merger( - local_state.vsort_exec_exprs.ordering_expr_ctxs(), _is_asc_order, _nulls_first, - state->batch_size(), _limit, _offset)); + local_state.ordering_expr_ctxs, _is_asc_order, _nulls_first, state->batch_size(), + _limit, _offset)); local_state.is_ready = true; return Status::OK(); } @@ -209,16 +213,10 @@ Status ExchangeLocalState::close(RuntimeState* state) { if (stream_recvr != nullptr) { stream_recvr->close(); } - if (_parent->cast()._is_merging) { - vsort_exec_exprs.close(state); - } return Base::close(state); } Status ExchangeSourceOperatorX::close(RuntimeState* state) { - if (_is_merging && !is_closed()) { - _vsort_exec_exprs.close(state); - } _is_closed = true; return OperatorX::close(state); } diff --git a/be/src/exec/operator/exchange_source_operator.h b/be/src/exec/operator/exchange_source_operator.h index e3ac3b3fe2bae8..da00e088586437 100644 --- a/be/src/exec/operator/exchange_source_operator.h +++ b/be/src/exec/operator/exchange_source_operator.h @@ -20,6 +20,7 @@ #include #include "exec/operator/operator.h" +#include "exprs/vexpr_fwd.h" namespace doris { class ExecNode; @@ -71,7 +72,7 @@ class ExchangeLocalState : public PipelineXLocalState<> { MOCK_FUNCTION void create_stream_recvr(RuntimeState* state); std::shared_ptr stream_recvr; - doris::VSortExecExprs vsort_exec_exprs; + doris::VExprContextSPtrs ordering_expr_ctxs; int64_t num_rows_skipped; bool is_ready; @@ -128,7 +129,7 @@ class ExchangeSourceOperatorX final : public OperatorX { // use in merge sort size_t _offset; - doris::VSortExecExprs _vsort_exec_exprs; + doris::VExprContextSPtrs _ordering_expr_ctxs; std::vector _is_asc_order; std::vector _nulls_first; }; diff --git a/be/src/exec/operator/local_merge_sort_source_operator.cpp b/be/src/exec/operator/local_merge_sort_source_operator.cpp index 8fbc87ff598f5a..dacd97357d391e 100644 --- a/be/src/exec/operator/local_merge_sort_source_operator.cpp +++ b/be/src/exec/operator/local_merge_sort_source_operator.cpp @@ -22,6 +22,8 @@ #include #include "exec/operator/operator.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" namespace doris { @@ -60,10 +62,9 @@ std::vector LocalMergeSortLocalState::dependencies() const { Status LocalMergeSortLocalState::build_merger(RuntimeState* state) { auto& p = _parent->cast(); VExprContextSPtrs ordering_expr_ctxs; - ordering_expr_ctxs.resize(p._vsort_exec_exprs.ordering_expr_ctxs().size()); + ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size()); for (size_t i = 0; i < ordering_expr_ctxs.size(); i++) { - RETURN_IF_ERROR( - p._vsort_exec_exprs.ordering_expr_ctxs()[i]->clone(state, ordering_expr_ctxs[i])); + RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, ordering_expr_ctxs[i])); } _merger = std::make_unique(ordering_expr_ctxs, p._is_asc_order, p._nulls_first, state->batch_size(), p._limit, @@ -89,7 +90,8 @@ LocalMergeSortSourceOperatorX::LocalMergeSortSourceOperatorX(ObjectPool* pool, Status LocalMergeSortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(Base::init(tnode, state)); - RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool)); + RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.sort_node.sort_info.ordering_exprs, + _ordering_expr_ctxs)); _is_asc_order = tnode.sort_node.sort_info.is_asc_order; _nulls_first = tnode.sort_node.sort_info.nulls_first; _op_name = "LOCAL_MERGE_SORT_SOURCE_OPERATOR"; @@ -98,8 +100,8 @@ Status LocalMergeSortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* Status LocalMergeSortSourceOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(Base::prepare(state)); - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); - RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor)); + RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state)); init_dependencies_and_sorter(); return Status::OK(); } diff --git a/be/src/exec/operator/local_merge_sort_source_operator.h b/be/src/exec/operator/local_merge_sort_source_operator.h index 924f488cd80cef..6219edc4224983 100644 --- a/be/src/exec/operator/local_merge_sort_source_operator.h +++ b/be/src/exec/operator/local_merge_sort_source_operator.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "exec/operator/operator.h" #include "exec/pipeline/dependency.h" +#include "exprs/vexpr_fwd.h" namespace doris { class RuntimeState; @@ -101,7 +102,7 @@ class LocalMergeSortSourceOperatorX final : public OperatorX _is_asc_order; std::vector _nulls_first; - VSortExecExprs _vsort_exec_exprs; + VExprContextSPtrs _ordering_expr_ctxs; const int64_t _offset; std::vector _other_source_deps; diff --git a/be/src/exec/operator/partition_sort_sink_operator.cpp b/be/src/exec/operator/partition_sort_sink_operator.cpp index 3ed467c7594680..ebbe31e1120a18 100644 --- a/be/src/exec/operator/partition_sort_sink_operator.cpp +++ b/be/src/exec/operator/partition_sort_sink_operator.cpp @@ -24,6 +24,8 @@ #include "common/status.h" #include "exec/common/hash_table/hash.h" #include "exec/operator/partition_sort_source_operator.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" namespace doris { @@ -32,7 +34,10 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); auto& p = _parent->cast(); - RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); + _ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size()); + for (size_t i = 0; i < p._ordering_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, _ordering_expr_ctxs[i])); + } _partition_expr_ctxs.resize(p._partition_expr_ctxs.size()); for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i])); @@ -53,7 +58,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _sorted_partition_input_rows_counter = ADD_COUNTER(custom_profile(), "SortedPartitionInputRows", TUnit::UNIT); _partition_sort_info = std::make_shared( - &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, + &_ordering_expr_ctxs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, p._child->row_desc(), state, custom_profile(), p._has_global_limit, p._partition_inner_limit, p._top_n_algorithm, p._topn_phase); custom_profile()->add_info_string("PartitionTopNPhase", to_string(p._topn_phase)); @@ -83,7 +88,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st //order by key if (tnode.partition_sort_node.__isset.sort_info) { - RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.partition_sort_node.sort_info, _pool)); + RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.partition_sort_node.sort_info.ordering_exprs, + _ordering_expr_ctxs)); _is_asc_order = tnode.partition_sort_node.sort_info.is_asc_order; _nulls_first = tnode.partition_sort_node.sort_info.nulls_first; } @@ -98,9 +104,9 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); + RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor)); RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, _child->row_desc())); - RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state)); RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state)); return Status::OK(); } diff --git a/be/src/exec/operator/partition_sort_sink_operator.h b/be/src/exec/operator/partition_sort_sink_operator.h index 3d8822e71cc9d6..48b709d3f1f0cd 100644 --- a/be/src/exec/operator/partition_sort_sink_operator.h +++ b/be/src/exec/operator/partition_sort_sink_operator.h @@ -22,6 +22,7 @@ #include "exec/common/partition_sort_utils.h" #include "exec/operator/operator.h" #include "exec/sort/partition_sorter.h" +#include "exprs/vexpr_fwd.h" namespace doris { @@ -41,7 +42,7 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState _value_places; @@ -115,7 +116,7 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX _distribute_exprs; // Expressions and parameters used for build _sort_description - VSortExecExprs _vsort_exec_exprs; + VExprContextSPtrs _ordering_expr_ctxs; std::vector _is_asc_order; std::vector _nulls_first; diff --git a/be/src/exec/operator/sort_sink_operator.cpp b/be/src/exec/operator/sort_sink_operator.cpp index 6ac9b099b01119..9b045da6d8c392 100644 --- a/be/src/exec/operator/sort_sink_operator.cpp +++ b/be/src/exec/operator/sort_sink_operator.cpp @@ -22,6 +22,8 @@ #include "exec/operator/operator.h" #include "exec/sort/heap_sorter.h" #include "exec/sort/topn_sorter.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" #include "runtime/query_context.h" namespace doris { @@ -43,23 +45,26 @@ Status SortSinkLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); auto& p = _parent->cast(); - RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); + _ordering_expr_ctxs.resize(p._ordering_expr_ctxs.size()); + for (size_t i = 0; i < p._ordering_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._ordering_expr_ctxs[i]->clone(state, _ordering_expr_ctxs[i])); + } switch (p._algorithm) { case TSortAlgorithm::HEAP_SORT: { _shared_state->sorter = HeapSorter::create_shared( - _vsort_exec_exprs, state, p._limit, p._offset, p._pool, p._is_asc_order, + _ordering_expr_ctxs, state, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, p._child->row_desc(), state->get_query_ctx()->has_runtime_predicate(p._node_id)); break; } case TSortAlgorithm::TOPN_SORT: { _shared_state->sorter = TopNSorter::create_shared( - _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, + _ordering_expr_ctxs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, p._child->row_desc(), state, custom_profile()); break; } case TSortAlgorithm::FULL_SORT: { - auto sorter = FullSorter::create_shared(_vsort_exec_exprs, p._limit, p._offset, p._pool, + auto sorter = FullSorter::create_shared(_ordering_expr_ctxs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, p._child->row_desc(), state, custom_profile()); if (p._max_buffered_bytes > 0) { @@ -107,7 +112,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, int dest Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); - RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool)); + RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.sort_node.sort_info.ordering_exprs, + _ordering_expr_ctxs)); _is_asc_order = tnode.sort_node.sort_info.is_asc_order; _nulls_first = tnode.sort_node.sort_info.nulls_first; @@ -121,8 +127,8 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { Status SortSinkOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(), _row_descriptor)); - return _vsort_exec_exprs.open(state); + RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, _row_descriptor)); + return VExpr::open(_ordering_expr_ctxs, state); } Status SortSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block, bool eos) { diff --git a/be/src/exec/operator/sort_sink_operator.h b/be/src/exec/operator/sort_sink_operator.h index 65aec48334e443..eda5bb31b53b1a 100644 --- a/be/src/exec/operator/sort_sink_operator.h +++ b/be/src/exec/operator/sort_sink_operator.h @@ -21,6 +21,7 @@ #include "core/field.h" #include "exec/operator/operator.h" +#include "exprs/vexpr_fwd.h" namespace doris { @@ -42,7 +43,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState { friend class SortSinkOperatorX; // Expressions and parameters used for build _sort_description - VSortExecExprs _vsort_exec_exprs; + VExprContextSPtrs _ordering_expr_ctxs; RuntimeProfile::Counter* _sort_blocks_memory_usage = nullptr; @@ -112,7 +113,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { ObjectPool* _pool = nullptr; // Expressions and parameters used for build _sort_description - VSortExecExprs _vsort_exec_exprs; + VExprContextSPtrs _ordering_expr_ctxs; std::vector _is_asc_order; std::vector _nulls_first; diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp index 8ff23e420784d0..3d36e1f10eb888 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp +++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.cpp @@ -20,6 +20,8 @@ #include "exec/spill/spill_file_manager.h" #include "exec/spill/spill_file_reader.h" #include "exec/spill/spill_file_writer.h" +#include "exprs/vexpr.h" +#include "exprs/vexpr_context.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -34,13 +36,13 @@ Status VIcebergSortWriter::open(RuntimeState* state, RuntimeProfile* profile, _row_desc = row_desc; // Initialize sort expressions from sort_info (contains ordering columns, asc/desc, nulls first/last) - RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool)); - RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc, *row_desc)); - RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + RETURN_IF_ERROR(VExpr::create_expr_trees(_sort_info.ordering_exprs, _ordering_expr_ctxs)); + RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, *row_desc)); + RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state)); // Create FullSorter for in-memory sorting with spill support enabled. // Parameters: limit=-1 (no limit), offset=0 (no offset) - _sorter = FullSorter::create_unique(_vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order, + _sorter = FullSorter::create_unique(_ordering_expr_ctxs, -1, 0, &_pool, _sort_info.is_asc_order, _sort_info.nulls_first, *row_desc, state, _profile); _sorter->init_profile(_profile); // Enable spill support so the sorter can be used with the spill framework diff --git a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h index 95ffc5a60e2faf..45858f473c9041 100644 --- a/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h +++ b/be/src/exec/sink/writer/iceberg/viceberg_sort_writer.h @@ -158,7 +158,7 @@ class VIcebergSortWriter : public IPartitionWriterBase { const RowDescriptor* _row_desc = nullptr; ObjectPool _pool; TSortInfo _sort_info; - VSortExecExprs _vsort_exec_exprs; + VExprContextSPtrs _ordering_expr_ctxs; // The underlying partition writer that handles actual Parquet/ORC file I/O std::shared_ptr _iceberg_partition_writer; // Lambda for creating new writers when file splitting occurs diff --git a/be/src/exec/sort/heap_sorter.cpp b/be/src/exec/sort/heap_sorter.cpp index fea5ef6240394a..ab5e898e67a643 100644 --- a/be/src/exec/sort/heap_sorter.cpp +++ b/be/src/exec/sort/heap_sorter.cpp @@ -25,11 +25,11 @@ namespace doris { -HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, RuntimeState* state, int64_t limit, - int64_t offset, ObjectPool* pool, std::vector& is_asc_order, - std::vector& nulls_first, const RowDescriptor& row_desc, - bool have_runtime_predicate) - : Sorter(vsort_exec_exprs, state, limit, offset, pool, is_asc_order, nulls_first), +HeapSorter::HeapSorter(const VExprContextSPtrs& ordering_expr_ctxs, RuntimeState* state, + int64_t limit, int64_t offset, ObjectPool* pool, + std::vector& is_asc_order, std::vector& nulls_first, + const RowDescriptor& row_desc, bool have_runtime_predicate) + : Sorter(ordering_expr_ctxs, state, limit, offset, pool, is_asc_order, nulls_first), _heap_size(limit + offset), _state(MergeSorterState::create_unique(row_desc, offset)), _have_runtime_predicate(have_runtime_predicate) {} @@ -38,11 +38,7 @@ Status HeapSorter::append_block(Block* block) { auto tmp_block = std::make_shared(block->clone_empty()); if (!_have_runtime_predicate && _queue.is_valid() && _queue_row_num >= _heap_size) { RETURN_IF_ERROR(_prepare_sort_columns(*block, *tmp_block, false)); - if (_materialize_sort_exprs) { - block->clear_column_data(); - } else { - tmp_block->swap(*block); - } + tmp_block->swap(*block); auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(tmp_block, _sort_description); size_t num_rows = tmp_block->rows(); _do_filter(*tmp_cursor_impl, num_rows); diff --git a/be/src/exec/sort/heap_sorter.h b/be/src/exec/sort/heap_sorter.h index ba1ea6f14f0c72..de3cfd53ea7ffe 100644 --- a/be/src/exec/sort/heap_sorter.h +++ b/be/src/exec/sort/heap_sorter.h @@ -25,9 +25,10 @@ class HeapSorter final : public Sorter { ENABLE_FACTORY_CREATOR(HeapSorter); public: - HeapSorter(VSortExecExprs& vsort_exec_exprs, RuntimeState* state, int64_t limit, int64_t offset, - ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, - const RowDescriptor& row_desc, bool have_runtime_predicate = true); + HeapSorter(const VExprContextSPtrs& ordering_expr_ctxs, RuntimeState* state, int64_t limit, + int64_t offset, ObjectPool* pool, std::vector& is_asc_order, + std::vector& nulls_first, const RowDescriptor& row_desc, + bool have_runtime_predicate = true); ~HeapSorter() override = default; diff --git a/be/src/exec/sort/partition_sorter.cpp b/be/src/exec/sort/partition_sorter.cpp index 39384fdade37b4..64422a202c236f 100644 --- a/be/src/exec/sort/partition_sorter.cpp +++ b/be/src/exec/sort/partition_sorter.cpp @@ -32,18 +32,13 @@ class RowDescriptor; class RuntimeProfile; class RuntimeState; -class VSortExecExprs; -} // namespace doris - -namespace doris { - -PartitionSorter::PartitionSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, - ObjectPool* pool, std::vector& is_asc_order, +PartitionSorter::PartitionSorter(const VExprContextSPtrs& ordering_expr_ctxs, int64_t limit, + int64_t offset, ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile, bool has_global_limit, int64_t partition_inner_limit, TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row) - : Sorter(vsort_exec_exprs, state, limit, offset, pool, is_asc_order, nulls_first), + : Sorter(ordering_expr_ctxs, state, limit, offset, pool, is_asc_order, nulls_first), _state(MergeSorterState::create_unique(row_desc, offset)), _row_desc(row_desc), _partition_inner_limit(partition_inner_limit), diff --git a/be/src/exec/sort/partition_sorter.h b/be/src/exec/sort/partition_sorter.h index e2fcbac2fb8b81..707d992a0d7833 100644 --- a/be/src/exec/sort/partition_sorter.h +++ b/be/src/exec/sort/partition_sorter.h @@ -35,7 +35,6 @@ class RuntimeProfile; class RuntimeState; class Block; -class VSortExecExprs; } // namespace doris namespace doris { @@ -74,7 +73,7 @@ class PartitionSorter final : public Sorter { ENABLE_FACTORY_CREATOR(PartitionSorter); public: - PartitionSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, + PartitionSorter(const VExprContextSPtrs& ordering_expr_ctxs, int64_t limit, int64_t offset, ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile, bool has_global_limit, diff --git a/be/src/exec/sort/sorter.cpp b/be/src/exec/sort/sorter.cpp index cd07c68578cf41..88160819328ce0 100644 --- a/be/src/exec/sort/sorter.cpp +++ b/be/src/exec/sort/sorter.cpp @@ -28,7 +28,6 @@ #include "common/object_pool.h" #include "core/block/block.h" -#include "core/block/column_with_type_and_name.h" #include "core/column/column.h" #include "core/column/column_nullable.h" #include "core/data_type/data_type.h" @@ -141,8 +140,7 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool reversed) { SCOPED_TIMER(_partial_sort_timer); uint64_t limit = reversed ? 0 : (_offset + _limit); - sort_block(_materialize_sort_exprs ? dest_block : src_block, dest_block, _sort_description, - _hybrid_sorter, limit); + sort_block(src_block, dest_block, _sort_description, _hybrid_sorter, limit); } src_block.clear_column_data(num_cols); @@ -150,22 +148,10 @@ Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool reversed) } Status Sorter::_prepare_sort_columns(Block& src_block, Block& dest_block, bool reversed) { - if (_materialize_sort_exprs) { - auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); - ColumnsWithTypeAndName columns_data(output_tuple_expr_ctxs.size()); - for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) { - RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block, columns_data[i])); - } - - Block new_block {columns_data}; - dest_block.swap(new_block); - } - - _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size()); - Block* result_block = _materialize_sort_exprs ? &dest_block : &src_block; + _sort_description.resize(_ordering_expr_ctxs.size()); for (int i = 0; i < _sort_description.size(); i++) { - const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i]; - RETURN_IF_ERROR(ordering_expr->execute(result_block, &_sort_description[i].column_number)); + const auto& ordering_expr = _ordering_expr_ctxs[i]; + RETURN_IF_ERROR(ordering_expr->execute(&src_block, &_sort_description[i].column_number)); _sort_description[i].direction = _is_asc_order[i] ? 1 : -1; _sort_description[i].nulls_direction = @@ -177,11 +163,11 @@ Status Sorter::_prepare_sort_columns(Block& src_block, Block& dest_block, bool r return Status::OK(); } -FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, +FullSorter::FullSorter(const VExprContextSPtrs& ordering_expr_ctxs, int64_t limit, int64_t offset, ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile) - : Sorter(vsort_exec_exprs, state, limit, offset, pool, is_asc_order, nulls_first), + : Sorter(ordering_expr_ctxs, state, limit, offset, pool, is_asc_order, nulls_first), _state(MergeSorterState::create_unique(row_desc, offset)) {} // check whether the unsorted block can hold more data from input block and no need to alloc new memory @@ -219,7 +205,7 @@ size_t FullSorter::get_reserve_mem_size(RuntimeState* state, bool eos) const { // helping data structures used during sorting size_to_reserve += new_rows * sizeof(IColumn::Permutation::value_type); - auto sort_columns_count = _vsort_exec_exprs.ordering_expr_ctxs().size(); + auto sort_columns_count = _ordering_expr_ctxs.size(); if (1 != sort_columns_count) { size_to_reserve += new_rows * sizeof(EqualRangeIterator); } diff --git a/be/src/exec/sort/sorter.h b/be/src/exec/sort/sorter.h index a084517b56e7d5..1651247eecc1ab 100644 --- a/be/src/exec/sort/sorter.h +++ b/be/src/exec/sort/sorter.h @@ -33,8 +33,8 @@ #include "exec/sort/hybrid_sorter.h" #include "exec/sort/sort_cursor.h" #include "exec/sort/sort_description.h" -#include "exec/sort/vsort_exec_exprs.h" #include "exec/sort/vsorted_run_merger.h" +#include "exprs/vexpr_fwd.h" #include "runtime/runtime_profile.h" #include "runtime/runtime_state.h" @@ -101,26 +101,26 @@ class MergeSorterState { class Sorter { public: - Sorter(VSortExecExprs& vsort_exec_exprs, RuntimeState* state, int64_t limit, int64_t offset, - ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first) - : _vsort_exec_exprs(vsort_exec_exprs), + Sorter(const VExprContextSPtrs& ordering_expr_ctxs, RuntimeState* state, int64_t limit, + int64_t offset, ObjectPool* pool, std::vector& is_asc_order, + std::vector& nulls_first) + : _ordering_expr_ctxs(ordering_expr_ctxs), _limit(limit), _offset(offset), _pool(pool), _is_asc_order(is_asc_order), _nulls_first(nulls_first), - _materialize_sort_exprs(vsort_exec_exprs.need_materialize_tuple()), _hybrid_sorter(state->enable_use_hybrid_sort()) {} #ifdef BE_TEST - VSortExecExprs mock_vsort_exec_exprs; + VExprContextSPtrs mock_ordering_expr_ctxs; std::vector mock_is_asc_order; std::vector mock_nulls_first; Sorter() - : _vsort_exec_exprs(mock_vsort_exec_exprs), + : _ordering_expr_ctxs(mock_ordering_expr_ctxs), _is_asc_order(mock_is_asc_order), _nulls_first(mock_nulls_first) {} SortDescription& get_mutable_sort_description() { return _sort_description; } - const VSortExecExprs& get_vsort_exec_exprs() const { return _vsort_exec_exprs; } + const VExprContextSPtrs& get_ordering_expr_ctxs() const { return _ordering_expr_ctxs; } #endif virtual ~Sorter() = default; @@ -159,7 +159,7 @@ class Sorter { Status _prepare_sort_columns(Block& src_block, Block& dest_block, bool reversed = false); bool _enable_spill = false; SortDescription _sort_description; - VSortExecExprs& _vsort_exec_exprs; + const VExprContextSPtrs& _ordering_expr_ctxs; int64_t _limit; int64_t _offset; ObjectPool* _pool = nullptr; @@ -171,7 +171,6 @@ class Sorter { RuntimeProfile::Counter* _partial_sort_counter = nullptr; std::priority_queue _block_priority_queue; - bool _materialize_sort_exprs; HybridSorter _hybrid_sorter; }; @@ -179,8 +178,8 @@ class FullSorter final : public Sorter { ENABLE_FACTORY_CREATOR(FullSorter); public: - FullSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, ObjectPool* pool, - std::vector& is_asc_order, std::vector& nulls_first, + FullSorter(const VExprContextSPtrs& ordering_expr_ctxs, int64_t limit, int64_t offset, + ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile); ~FullSorter() override = default; diff --git a/be/src/exec/sort/topn_sorter.cpp b/be/src/exec/sort/topn_sorter.cpp index b359ff1d1b316c..fc84cc777a7add 100644 --- a/be/src/exec/sort/topn_sorter.cpp +++ b/be/src/exec/sort/topn_sorter.cpp @@ -32,16 +32,11 @@ class RowDescriptor; class RuntimeProfile; class RuntimeState; -class VSortExecExprs; -} // namespace doris - -namespace doris { - -TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, +TopNSorter::TopNSorter(const VExprContextSPtrs& ordering_expr_ctxs, int64_t limit, int64_t offset, ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile) - : Sorter(vsort_exec_exprs, state, limit, offset, pool, is_asc_order, nulls_first), + : Sorter(ordering_expr_ctxs, state, limit, offset, pool, is_asc_order, nulls_first), _state(MergeSorterState::create_unique(row_desc, offset)), _row_desc(row_desc) {} diff --git a/be/src/exec/sort/topn_sorter.h b/be/src/exec/sort/topn_sorter.h index c2757aa4b7564a..9da4a81011478c 100644 --- a/be/src/exec/sort/topn_sorter.h +++ b/be/src/exec/sort/topn_sorter.h @@ -32,7 +32,6 @@ class RuntimeProfile; class RuntimeState; class Block; -class VSortExecExprs; } // namespace doris namespace doris { @@ -41,8 +40,8 @@ class TopNSorter final : public Sorter { ENABLE_FACTORY_CREATOR(TopNSorter); public: - TopNSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit, int64_t offset, ObjectPool* pool, - std::vector& is_asc_order, std::vector& nulls_first, + TopNSorter(const VExprContextSPtrs& ordering_expr_ctxs, int64_t limit, int64_t offset, + ObjectPool* pool, std::vector& is_asc_order, std::vector& nulls_first, const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile); ~TopNSorter() override = default; diff --git a/be/src/exec/sort/vsort_exec_exprs.cpp b/be/src/exec/sort/vsort_exec_exprs.cpp deleted file mode 100644 index 33fe1910f7253f..00000000000000 --- a/be/src/exec/sort/vsort_exec_exprs.cpp +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/sort/vsort_exec_exprs.h" - -#include -#include -#include - -#include "exprs/vexpr.h" -#include "exprs/vexpr_context.h" - -namespace doris { -class ObjectPool; -class RowDescriptor; -class RuntimeState; -} // namespace doris - -namespace doris { - -Status VSortExecExprs::init(const TSortInfo& sort_info, ObjectPool* pool) { - return init(sort_info.ordering_exprs, - sort_info.__isset.sort_tuple_slot_exprs ? &sort_info.sort_tuple_slot_exprs : NULL, - pool); -} - -Status VSortExecExprs::init(const std::vector& ordering_exprs, - const std::vector* sort_tuple_slot_exprs, ObjectPool* pool) { - RETURN_IF_ERROR(VExpr::create_expr_trees(ordering_exprs, _ordering_expr_ctxs)); - if (sort_tuple_slot_exprs != NULL) { - _materialize_tuple = true; - RETURN_IF_ERROR( - VExpr::create_expr_trees(*sort_tuple_slot_exprs, _sort_tuple_slot_expr_ctxs)); - } else { - _materialize_tuple = false; - } - return Status::OK(); -} - -Status VSortExecExprs::prepare(RuntimeState* state, const RowDescriptor& child_row_desc, - const RowDescriptor& output_row_desc) { - if (_materialize_tuple) { - RETURN_IF_ERROR(VExpr::prepare(_sort_tuple_slot_expr_ctxs, state, child_row_desc)); - } - RETURN_IF_ERROR(VExpr::prepare(_ordering_expr_ctxs, state, output_row_desc)); - return Status::OK(); -} - -Status VSortExecExprs::open(RuntimeState* state) { - if (_materialize_tuple) { - RETURN_IF_ERROR(VExpr::open(_sort_tuple_slot_expr_ctxs, state)); - } - RETURN_IF_ERROR(VExpr::open(_ordering_expr_ctxs, state)); - return Status::OK(); -} - -void VSortExecExprs::close(RuntimeState* state) {} - -Status VSortExecExprs::clone(RuntimeState* state, VSortExecExprs& new_exprs) { - new_exprs._ordering_expr_ctxs.resize(_ordering_expr_ctxs.size()); - for (size_t i = 0; i < _ordering_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(_ordering_expr_ctxs[i]->clone(state, new_exprs._ordering_expr_ctxs[i])); - } - new_exprs._sort_tuple_slot_expr_ctxs.resize(_sort_tuple_slot_expr_ctxs.size()); - for (size_t i = 0; i < _sort_tuple_slot_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(_sort_tuple_slot_expr_ctxs[i]->clone( - state, new_exprs._sort_tuple_slot_expr_ctxs[i])); - } - new_exprs._materialize_tuple = _materialize_tuple; - return Status::OK(); -} - -} // namespace doris diff --git a/be/src/exec/sort/vsort_exec_exprs.h b/be/src/exec/sort/vsort_exec_exprs.h deleted file mode 100644 index b60b88da0ae4b5..00000000000000 --- a/be/src/exec/sort/vsort_exec_exprs.h +++ /dev/null @@ -1,86 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include "common/status.h" -#include "exprs/vexpr_fwd.h" - -namespace doris { - -class ObjectPool; -class RowDescriptor; -class RuntimeState; -class TExpr; -class TSortInfo; - -// Helper class to Prepare() , Open() and Close() the ordering expressions used to perform -// comparisons in a sort. Used by TopNNode, SortNode. When two -// rows are compared, the ordering expressions are evaluated once for each side. -// TopN and Sort materialize input rows into a single tuple before sorting. -// If _materialize_tuple is true, SortExecExprs also stores the slot expressions used to -// materialize the sort tuples. - -class VSortExecExprs { -public: - // Initialize the expressions from a TSortInfo using the specified pool. - Status init(const TSortInfo& sort_info, ObjectPool* pool); - - // prepare all expressions used for sorting and tuple materialization. - Status prepare(RuntimeState* state, const RowDescriptor& child_row_desc, - const RowDescriptor& output_row_desc); - - // open all expressions used for sorting and tuple materialization. - Status open(RuntimeState* state); - - // close all expressions used for sorting and tuple materialization. - void close(RuntimeState* state); - - const VExprContextSPtrs& sort_tuple_slot_expr_ctxs() const { - return _sort_tuple_slot_expr_ctxs; - } - - // Can only be used after calling prepare() - const VExprContextSPtrs& ordering_expr_ctxs() const { return _ordering_expr_ctxs; } - - bool need_materialize_tuple() const { return _materialize_tuple; } - - Status clone(RuntimeState* state, VSortExecExprs& new_exprs); - -private: - // Create two VExprContexts for evaluating over the TupleRows. - VExprContextSPtrs _ordering_expr_ctxs; - - // If true, the tuples to be sorted are materialized by - // _sort_tuple_slot_exprs before the actual sort is performed. - bool _materialize_tuple; - - // Expressions used to materialize slots in the tuples to be sorted. - // One expr per slot in the materialized tuple. Valid only if - // _materialize_tuple is true. - VExprContextSPtrs _sort_tuple_slot_expr_ctxs; - - // Initialize the ordering and (optionally) materialization expressions from the thrift - // TExprs into the specified pool. sort_tuple_slot_exprs is NULL if the tuple is not - // materialized. - Status init(const std::vector& ordering_exprs, - const std::vector* sort_tuple_slot_exprs, ObjectPool* pool); -}; - -} // namespace doris diff --git a/be/test/core/data_type/data_type_timestamptz_test.cpp b/be/test/core/data_type/data_type_timestamptz_test.cpp index 845bcff0f9e300..8d52dd58cb6289 100644 --- a/be/test/core/data_type/data_type_timestamptz_test.cpp +++ b/be/test/core/data_type/data_type_timestamptz_test.cpp @@ -111,25 +111,17 @@ TEST_F(DataTypeTimeStampTzTest, test_sort) { ObjectPool pool; - VSortExecExprs sort_exec_exprs; + VExprContextSPtrs ordering_expr_ctxs; std::vector is_asc_order {true}; std::vector nulls_first {false}; row_desc.reset(new MockRowDescriptor({std::make_shared()}, &pool)); - sort_exec_exprs._sort_tuple_slot_expr_ctxs = + ordering_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared()); - sort_exec_exprs._materialize_tuple = false; - - sort_exec_exprs._ordering_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sorter = FullSorter::create_unique(sort_exec_exprs, 3, 3, &pool, is_asc_order, nulls_first, + sorter = FullSorter::create_unique(ordering_expr_ctxs, 3, 3, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr); sorter->init_profile(&_profile); { diff --git a/be/test/exec/operator/local_merge_sort_source_operator_test.cpp b/be/test/exec/operator/local_merge_sort_source_operator_test.cpp index 500b725eb2b76c..35aabf44be406c 100644 --- a/be/test/exec/operator/local_merge_sort_source_operator_test.cpp +++ b/be/test/exec/operator/local_merge_sort_source_operator_test.cpp @@ -40,12 +40,7 @@ struct LocalMergeSOrtSourceOperatorTest : public testing::Test { op->_is_asc_order = {false}; op->_nulls_first = {false}; - op->_vsort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(std::make_shared()); - - op->_vsort_exec_exprs._materialize_tuple = false; - - op->_vsort_exec_exprs._ordering_expr_ctxs = + op->_ordering_expr_ctxs = MockSlotRef::create_mock_contexts(std::make_shared()); op->init_dependencies_and_sorter(); diff --git a/be/test/exec/operator/partition_sort_sink_operator_test.cpp b/be/test/exec/operator/partition_sort_sink_operator_test.cpp index fa42cc7576bfe4..744ca8e84521f0 100644 --- a/be/test/exec/operator/partition_sort_sink_operator_test.cpp +++ b/be/test/exec/operator/partition_sort_sink_operator_test.cpp @@ -91,12 +91,7 @@ struct PartitionSortOperatorTest : public ::testing::Test { sink->_is_asc_order = {true}; sink->_nulls_first = {false}; - sink->_vsort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(std::make_shared()); - - sink->_vsort_exec_exprs._materialize_tuple = false; - - sink->_vsort_exec_exprs._ordering_expr_ctxs = + sink->_ordering_expr_ctxs = MockSlotRef::create_mock_contexts(std::make_shared()); if (partition_exprs_num > 0) { diff --git a/be/test/exec/operator/sort_operator_test.cpp b/be/test/exec/operator/sort_operator_test.cpp index 6d28e3bb458c84..210ad1a71bd089 100644 --- a/be/test/exec/operator/sort_operator_test.cpp +++ b/be/test/exec/operator/sort_operator_test.cpp @@ -60,12 +60,7 @@ struct SortOperatorTest : public ::testing::Test { sink->_is_asc_order = {true}; sink->_nulls_first = {false}; - sink->_vsort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(std::make_shared()); - - sink->_vsort_exec_exprs._materialize_tuple = false; - - sink->_vsort_exec_exprs._ordering_expr_ctxs = + sink->_ordering_expr_ctxs = MockSlotRef::create_mock_contexts(std::make_shared()); _child_op->_mock_row_desc.reset( diff --git a/be/test/exec/operator/spill_sort_source_operator_test.cpp b/be/test/exec/operator/spill_sort_source_operator_test.cpp index 5e59318dee4c1a..9f458699fa9f03 100644 --- a/be/test/exec/operator/spill_sort_source_operator_test.cpp +++ b/be/test/exec/operator/spill_sort_source_operator_test.cpp @@ -52,7 +52,7 @@ struct SpillSortSourceTestContext { void init_spill_sort_description(SpillSortSharedState* shared_state) { auto* sorter = shared_state->in_mem_shared_state->sorter.get(); auto& sort_desc = sorter->get_mutable_sort_description(); - sort_desc.resize(sorter->get_vsort_exec_exprs().ordering_expr_ctxs().size()); + sort_desc.resize(sorter->get_ordering_expr_ctxs().size()); for (int i = 0; i < static_cast(sort_desc.size()); ++i) { sort_desc[i].column_number = i; sort_desc[i].direction = 1; @@ -352,7 +352,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill) { auto* sorter = shared_state->in_mem_shared_state->sorter.get(); auto& sort_desc = sorter->get_mutable_sort_description(); - sort_desc.resize(sorter->get_vsort_exec_exprs().ordering_expr_ctxs().size()); + sort_desc.resize(sorter->get_ordering_expr_ctxs().size()); for (int i = 0; i < (int)sort_desc.size(); i++) { sort_desc[i].column_number = i; sort_desc[i].direction = 1; @@ -543,7 +543,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpill2) { auto* sorter = shared_state->in_mem_shared_state->sorter.get(); auto& sort_desc = sorter->get_mutable_sort_description(); - sort_desc.resize(sorter->get_vsort_exec_exprs().ordering_expr_ctxs().size()); + sort_desc.resize(sorter->get_ordering_expr_ctxs().size()); for (int i = 0; i < (int)sort_desc.size(); i++) { sort_desc[i].column_number = i; sort_desc[i].direction = 1; @@ -888,7 +888,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSpillError) { auto* sorter = shared_state->in_mem_shared_state->sorter.get(); auto& sort_desc = sorter->get_mutable_sort_description(); - sort_desc.resize(sorter->get_vsort_exec_exprs().ordering_expr_ctxs().size()); + sort_desc.resize(sorter->get_ordering_expr_ctxs().size()); for (int i = 0; i < (int)sort_desc.size(); i++) { sort_desc[i].column_number = i; sort_desc[i].direction = 1; @@ -1009,7 +1009,7 @@ TEST_F(SpillSortSourceOperatorTest, GetBlockWithSingleSpillFile) { auto* sorter = shared_state->in_mem_shared_state->sorter.get(); auto& sort_desc = sorter->get_mutable_sort_description(); - sort_desc.resize(sorter->get_vsort_exec_exprs().ordering_expr_ctxs().size()); + sort_desc.resize(sorter->get_ordering_expr_ctxs().size()); for (int i = 0; i < (int)sort_desc.size(); i++) { sort_desc[i].column_number = i; sort_desc[i].direction = 1; @@ -1153,7 +1153,7 @@ TEST_F(SpillSortSourceOperatorTest, EndToEndSinkAndSource) { // Read back from source auto* sorter = shared_state->in_mem_shared_state->sorter.get(); auto& sort_desc = sorter->get_mutable_sort_description(); - sort_desc.resize(sorter->get_vsort_exec_exprs().ordering_expr_ctxs().size()); + sort_desc.resize(sorter->get_ordering_expr_ctxs().size()); for (int i = 0; i < (int)sort_desc.size(); i++) { sort_desc[i].column_number = i; sort_desc[i].direction = 1; diff --git a/be/test/exec/sort/full_sort_test.cpp b/be/test/exec/sort/full_sort_test.cpp index 8eb927e34a5a56..e182048c807dad 100644 --- a/be/test/exec/sort/full_sort_test.cpp +++ b/be/test/exec/sort/full_sort_test.cpp @@ -32,7 +32,7 @@ #include "exec/sort/heap_sorter.h" #include "exec/sort/sorter.h" #include "exec/sort/topn_sorter.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr_fwd.h" #include "runtime/runtime_state.h" #include "testutil/column_helper.h" #include "testutil/mock/mock_descriptors.h" @@ -45,15 +45,7 @@ struct FullSorterTest : public testing::Test { void SetUp() override { row_desc.reset(new MockRowDescriptor({std::make_shared()}, &pool)); - sort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._materialize_tuple = false; - - sort_exec_exprs._ordering_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._sort_tuple_slot_expr_ctxs = + ordering_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared()); } MockRuntimeState _state; @@ -65,14 +57,14 @@ struct FullSorterTest : public testing::Test { ObjectPool pool; - VSortExecExprs sort_exec_exprs; + VExprContextSPtrs ordering_expr_ctxs; std::vector is_asc_order {true}; std::vector nulls_first {false}; }; TEST_F(FullSorterTest, test_full_sorter1) { - sorter = FullSorter::create_unique(sort_exec_exprs, -1, 0, &pool, is_asc_order, nulls_first, + sorter = FullSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr); Block block1 = ColumnHelper::create_block({1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); @@ -84,7 +76,7 @@ TEST_F(FullSorterTest, test_full_sorter1) { } TEST_F(FullSorterTest, test_full_sorter2) { - sorter = FullSorter::create_unique(sort_exec_exprs, -1, 0, &pool, is_asc_order, nulls_first, + sorter = FullSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr); { Block block = ColumnHelper::create_block({1, 2, 3, 4, 5, 6, 7, 8, 9, 10}); @@ -103,7 +95,7 @@ TEST_F(FullSorterTest, test_full_sorter2) { } TEST_F(FullSorterTest, test_full_sorter3) { - sorter = FullSorter::create_unique(sort_exec_exprs, 3, 3, &pool, is_asc_order, nulls_first, + sorter = FullSorter::create_unique(ordering_expr_ctxs, 3, 3, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr); sorter->init_profile(&_profile); { diff --git a/be/test/exec/sort/heap_sorter_test.cpp b/be/test/exec/sort/heap_sorter_test.cpp index aa64259b58237e..90b06764175f1e 100644 --- a/be/test/exec/sort/heap_sorter_test.cpp +++ b/be/test/exec/sort/heap_sorter_test.cpp @@ -33,7 +33,7 @@ #include "core/block/block.h" #include "exec/sort/sorter.h" #include "exec/sort/topn_sorter.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr_fwd.h" #include "runtime/runtime_state.h" #include "testutil/column_helper.h" #include "testutil/mock/mock_descriptors.h" @@ -46,15 +46,7 @@ struct HeapSorterTest : public testing::Test { void SetUp() override { row_desc.reset(new MockRowDescriptor({std::make_shared()}, &pool)); - sort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._materialize_tuple = false; - - sort_exec_exprs._ordering_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._sort_tuple_slot_expr_ctxs = + ordering_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared()); } MockRuntimeState _state; @@ -66,7 +58,7 @@ struct HeapSorterTest : public testing::Test { ObjectPool pool; - VSortExecExprs sort_exec_exprs; + VExprContextSPtrs ordering_expr_ctxs; std::vector is_asc_order {true, true}; std::vector nulls_first {false, false}; @@ -76,15 +68,9 @@ TEST_F(HeapSorterTest, test_topn_sorter1) { DataTypes data_types {std::make_shared(), std::make_shared()}; row_desc.reset(new MockRowDescriptor(data_types, &pool)); - sort_exec_exprs._sort_tuple_slot_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); - - sort_exec_exprs._materialize_tuple = true; - - sort_exec_exprs._ordering_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); - - sort_exec_exprs._sort_tuple_slot_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); + ordering_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); - sorter = HeapSorter::create_unique(sort_exec_exprs, &_state, 6, 0, &pool, is_asc_order, + sorter = HeapSorter::create_unique(ordering_expr_ctxs, &_state, 6, 0, &pool, is_asc_order, nulls_first, *row_desc); sorter->init_profile(&_profile); diff --git a/be/test/exec/sort/merge_sorter_state.cpp b/be/test/exec/sort/merge_sorter_state.cpp index 7b859148fd3cc9..0dc8a1a8937164 100644 --- a/be/test/exec/sort/merge_sorter_state.cpp +++ b/be/test/exec/sort/merge_sorter_state.cpp @@ -32,7 +32,7 @@ #include "exec/sort/heap_sorter.h" #include "exec/sort/sorter.h" #include "exec/sort/topn_sorter.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr_fwd.h" #include "runtime/runtime_state.h" #include "testutil/column_helper.h" #include "testutil/mock/mock_descriptors.h" diff --git a/be/test/exec/sort/partition_sorter_test.cpp b/be/test/exec/sort/partition_sorter_test.cpp index d70c6305ae1bdd..8bf863e840c873 100644 --- a/be/test/exec/sort/partition_sorter_test.cpp +++ b/be/test/exec/sort/partition_sorter_test.cpp @@ -34,7 +34,7 @@ #include "exec/sort/heap_sorter.h" #include "exec/sort/sorter.h" #include "exec/sort/topn_sorter.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr_fwd.h" #include "runtime/runtime_state.h" #include "testutil/column_helper.h" #include "testutil/mock/mock_descriptors.h" @@ -47,15 +47,7 @@ struct PartitionSorterTest : public testing::Test { void SetUp() override { row_desc.reset(new MockRowDescriptor({std::make_shared()}, &pool)); - sort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._materialize_tuple = false; - - sort_exec_exprs._ordering_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._sort_tuple_slot_expr_ctxs = + ordering_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared()); } MockRuntimeState _state; @@ -67,14 +59,14 @@ struct PartitionSorterTest : public testing::Test { ObjectPool pool; - VSortExecExprs sort_exec_exprs; + VExprContextSPtrs ordering_expr_ctxs; std::vector is_asc_order {true}; std::vector nulls_first {false}; }; TEST_F(PartitionSorterTest, test_partition_sorter_read_row_num) { - sorter = PartitionSorter::create_unique(sort_exec_exprs, -1, 0, &pool, is_asc_order, + sorter = PartitionSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr, false, 20, TopNAlgorithm::ROW_NUMBER, nullptr); sorter->init_profile(&_profile); @@ -120,7 +112,7 @@ TEST_F(PartitionSorterTest, test_partition_sorter_read_row_num) { TEST_F(PartitionSorterTest, test_partition_sorter_DENSE_RANK) { SortCursorCmp previous_row; - sorter = PartitionSorter::create_unique(sort_exec_exprs, -1, 0, &pool, is_asc_order, + sorter = PartitionSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr, false, 20, TopNAlgorithm::DENSE_RANK, &previous_row); sorter->init_profile(&_profile); @@ -159,7 +151,7 @@ TEST_F(PartitionSorterTest, test_partition_sorter_DENSE_RANK) { TEST_F(PartitionSorterTest, test_partition_sorter_RANK) { SortCursorCmp previous_row; - sorter = PartitionSorter::create_unique(sort_exec_exprs, -1, 0, &pool, is_asc_order, + sorter = PartitionSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr, false, 20, TopNAlgorithm::RANK, &previous_row); sorter->init_profile(&_profile); diff --git a/be/test/exec/sort/sort_test.cpp b/be/test/exec/sort/sort_test.cpp index 1f40a3665e2bc2..c21c6e03a8c9bd 100644 --- a/be/test/exec/sort/sort_test.cpp +++ b/be/test/exec/sort/sort_test.cpp @@ -31,7 +31,7 @@ #include "exec/sort/heap_sorter.h" #include "exec/sort/sorter.h" #include "exec/sort/topn_sorter.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr_fwd.h" #include "format/orc/vorc_reader.h" #include "runtime/runtime_state.h" #include "testutil/column_helper.h" @@ -54,25 +54,21 @@ class SortTestParam { std::vector data_types {std::make_shared()}; row_desc = std::make_unique(data_types, &pool); - sort_exec_exprs._sort_tuple_slot_expr_ctxs.push_back( - VExprContext::create_shared(std::make_shared(0))); - - sort_exec_exprs._materialize_tuple = false; - - sort_exec_exprs._ordering_expr_ctxs.push_back( - VExprContext::create_shared(std::make_shared(0))); + ordering_expr_ctxs.push_back(VExprContext::create_shared(std::make_shared(0))); switch (sort_type) { case TestSortType::FULL_SORT: - sorter = FullSorter::create_unique(sort_exec_exprs, limit, offset, &pool, is_asc_order, - nulls_first, *row_desc, &_state, nullptr); + sorter = FullSorter::create_unique(ordering_expr_ctxs, limit, offset, &pool, + is_asc_order, nulls_first, *row_desc, &_state, + nullptr); break; case TestSortType::TOPN_SORT: - sorter = TopNSorter::create_unique(sort_exec_exprs, limit, offset, &pool, is_asc_order, - nulls_first, *row_desc, &_state, nullptr); + sorter = TopNSorter::create_unique(ordering_expr_ctxs, limit, offset, &pool, + is_asc_order, nulls_first, *row_desc, &_state, + nullptr); break; case TestSortType::HEAP_SORT: - sorter = HeapSorter::create_unique(sort_exec_exprs, &_state, limit, offset, &pool, + sorter = HeapSorter::create_unique(ordering_expr_ctxs, &_state, limit, offset, &pool, is_asc_order, nulls_first, *row_desc); break; default: @@ -114,7 +110,7 @@ class SortTestParam { TestSortType sort_type; int64_t limit; int64_t offset; - VSortExecExprs sort_exec_exprs; + VExprContextSPtrs ordering_expr_ctxs; ObjectPool pool; std::unique_ptr row_desc; std::unique_ptr profile = std::make_unique(""); @@ -179,7 +175,7 @@ TEST_F(SortTest, test_heap_sort) { } TEST_F(SortTest, test_sorter) { - VSortExecExprs sort_exec_exprs; + VExprContextSPtrs ordering_expr_ctxs; ObjectPool pool; std::unique_ptr row_desc; std::unique_ptr profile = std::make_unique(""); @@ -191,21 +187,15 @@ TEST_F(SortTest, test_sorter) { DataTypes data_types {std::make_shared(), std::make_shared()}; row_desc.reset(new MockRowDescriptor(data_types, &pool)); - sort_exec_exprs._sort_tuple_slot_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); - - sort_exec_exprs._materialize_tuple = true; - - sort_exec_exprs._ordering_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); - - sort_exec_exprs._sort_tuple_slot_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); + ordering_expr_ctxs = MockSlotRef::create_mock_contexts(data_types); MockRuntimeState _state; - sorter = FullSorter::create_unique(sort_exec_exprs, -1, 0, &pool, is_asc_order, nulls_first, + sorter = FullSorter::create_unique(ordering_expr_ctxs, -1, 0, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr); { Block src_block = ColumnHelper::create_block({4, 1, 2}, {10, 1, 3}); - Block dest_block; + Block dest_block = src_block.clone_empty(); auto st = sorter->partial_sort(src_block, dest_block); EXPECT_TRUE(st.ok()) << st.msg(); std::cout << dest_block.dump_data() << std::endl; diff --git a/be/test/exec/sort/topn_sort_test.cpp b/be/test/exec/sort/topn_sort_test.cpp index 54e80c51a4b62e..fcb82c91103bfd 100644 --- a/be/test/exec/sort/topn_sort_test.cpp +++ b/be/test/exec/sort/topn_sort_test.cpp @@ -32,7 +32,7 @@ #include "exec/sort/heap_sorter.h" #include "exec/sort/sorter.h" #include "exec/sort/topn_sorter.h" -#include "exec/sort/vsort_exec_exprs.h" +#include "exprs/vexpr_fwd.h" #include "runtime/runtime_state.h" #include "testutil/column_helper.h" #include "testutil/mock/mock_descriptors.h" @@ -45,15 +45,7 @@ struct TopNSorterTest : public testing::Test { void SetUp() override { row_desc.reset(new MockRowDescriptor({std::make_shared()}, &pool)); - sort_exec_exprs._sort_tuple_slot_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._materialize_tuple = false; - - sort_exec_exprs._ordering_expr_ctxs = - MockSlotRef::create_mock_contexts(0, std::make_shared()); - - sort_exec_exprs._sort_tuple_slot_expr_ctxs = + ordering_expr_ctxs = MockSlotRef::create_mock_contexts(0, std::make_shared()); } MockRuntimeState _state; @@ -65,14 +57,14 @@ struct TopNSorterTest : public testing::Test { ObjectPool pool; - VSortExecExprs sort_exec_exprs; + VExprContextSPtrs ordering_expr_ctxs; std::vector is_asc_order {true}; std::vector nulls_first {false}; }; TEST_F(TopNSorterTest, test_topn_sorter1) { - sorter = TopNSorter::create_unique(sort_exec_exprs, 3, 3, &pool, is_asc_order, nulls_first, + sorter = TopNSorter::create_unique(ordering_expr_ctxs, 3, 3, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr); sorter->init_profile(&_profile); { @@ -93,7 +85,7 @@ TEST_F(TopNSorterTest, test_topn_sorter1) { } TEST_F(TopNSorterTest, test_topn_sorter2) { - sorter = TopNSorter::create_unique(sort_exec_exprs, -1, 3, &pool, is_asc_order, nulls_first, + sorter = TopNSorter::create_unique(ordering_expr_ctxs, -1, 3, &pool, is_asc_order, nulls_first, *row_desc, &_state, nullptr); sorter->init_profile(&_profile); {