From aaf3e16d6658800264772f812a2ac62d81395f1a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 1 Sep 2023 10:00:02 +0800 Subject: [PATCH 1/3] [pipelineX](minor) Add type check for cast --- be/src/pipeline/pipeline_x/operator.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 85cdeb7226ca8e..e855ad2ec03be4 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -49,10 +49,16 @@ class PipelineXLocalStateBase { template TARGET& cast() { + DCHECK(dynamic_cast(this)) + << " Mismatch type! Current type is " << typeid(*this).name() + << " and expect type is" << typeid(TARGET).name(); return reinterpret_cast(*this); } template const TARGET& cast() const { + DCHECK(dynamic_cast(this)) + << " Mismatch type! Current type is " << typeid(*this).name() + << " and expect type is" << typeid(TARGET).name(); return reinterpret_cast(*this); } From 9cbdfb10ec5ca155659e5e0e6b071bb3273ddffd Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 1 Sep 2023 16:14:24 +0800 Subject: [PATCH 2/3] [pipelineX](join) support nested loop operator --- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- .../pipeline/exec/hashjoin_probe_operator.cpp | 6 +- .../pipeline/exec/hashjoin_probe_operator.h | 7 +- .../exec/join_build_sink_operator.cpp | 5 +- be/src/pipeline/exec/join_probe_operator.cpp | 35 +- be/src/pipeline/exec/join_probe_operator.h | 12 +- .../exec/nested_loop_join_build_operator.cpp | 95 ++- .../exec/nested_loop_join_build_operator.h | 62 +- .../exec/nested_loop_join_probe_operator.cpp | 552 ++++++++++++++++++ .../exec/nested_loop_join_probe_operator.h | 183 ++++++ be/src/pipeline/pipeline_x/dependency.cpp | 21 +- be/src/pipeline/pipeline_x/dependency.h | 46 +- be/src/pipeline/pipeline_x/operator.cpp | 4 + .../pipeline_x_fragment_context.cpp | 20 + .../vec/exec/join/vnested_loop_join_node.cpp | 49 +- be/src/vec/exec/join/vnested_loop_join_node.h | 16 + 16 files changed, 1036 insertions(+), 79 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 56d9ec050cdd4a..be059defa7aa77 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -47,7 +47,7 @@ class HashJoinBuildSink final : public StreamingOperator { + : public JoinBuildSinkLocalState { public: ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState); using Parent = HashJoinBuildSinkOperatorX; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 587631ded1e2be..cfe9feadeb92f7 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -27,9 +27,7 @@ namespace pipeline { OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, StatefulOperator) HashJoinProbeLocalState::HashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) - : JoinProbeLocalState(state, parent), - _child_block(vectorized::Block::create_unique()), - _child_source_state(SourceState::DEPEND_ON_SOURCE) {} + : JoinProbeLocalState(state, parent) {} Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); @@ -96,7 +94,7 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) { _tuple_is_null_left_flag_column = nullptr; _tuple_is_null_right_flag_column = nullptr; _probe_block.clear(); - return JoinProbeLocalState::close(state); + return JoinProbeLocalState::close(state); } bool HashJoinProbeLocalState::_need_probe_null_map(vectorized::Block& block, diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 72c2c1bf2e603b..ce60dd1545bf6d 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -48,7 +48,7 @@ class HashJoinProbeOperator final : public StatefulOperator { + : public JoinProbeLocalState { public: using Parent = HashJoinProbeOperatorX; ENABLE_FACTORY_CREATOR(HashJoinProbeLocalState); @@ -59,7 +59,7 @@ class HashJoinProbeLocalState final Status close(RuntimeState* state) override; void prepare_for_next(); - void add_tuple_is_null_column(vectorized::Block* block); + void add_tuple_is_null_column(vectorized::Block* block) override; void init_for_probe(RuntimeState* state); HashJoinProbeOperatorX* join_probe() { return (HashJoinProbeOperatorX*)_parent; } @@ -70,9 +70,6 @@ class HashJoinProbeLocalState final friend class HashJoinProbeOperatorX; friend struct vectorized::HashJoinProbeContext; - std::unique_ptr _child_block; - SourceState _child_source_state; - int _probe_index = -1; bool _probe_eos = false; std::atomic _probe_inited = false; diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp b/be/src/pipeline/exec/join_build_sink_operator.cpp index 5de7a2504bb6d4..f64f9128d76973 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.cpp +++ b/be/src/pipeline/exec/join_build_sink_operator.cpp @@ -18,6 +18,7 @@ #include "join_build_sink_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/pipeline_x/operator.h" namespace doris::pipeline { @@ -117,6 +118,8 @@ void JoinBuildSinkOperatorX::_init_join_op() { } template class JoinBuildSinkOperatorX; -template class JoinBuildSinkLocalState; +template class JoinBuildSinkLocalState; +template class JoinBuildSinkOperatorX; +template class JoinBuildSinkLocalState; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 3670d640d5dc5b..8cc8fbd4b24c14 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -18,6 +18,7 @@ #include "join_probe_operator.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/operator.h" namespace doris::pipeline { @@ -25,9 +26,8 @@ namespace doris::pipeline { template Status JoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { - RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); - auto& p = - PipelineXLocalState::_parent->template cast(); + RETURN_IF_ERROR(Base::init(state, info)); + auto& p = Base::_parent->template cast(); // only use in outer join as the bool column to mark for function of `tuple_is_null` if (p._is_outer_join) { _tuple_is_null_left_flag_column = vectorized::ColumnUInt8::create(); @@ -38,8 +38,7 @@ Status JoinProbeLocalState::init(RuntimeState* state, RETURN_IF_ERROR(p._output_expr_ctxs[i]->clone(state, _output_expr_ctxs[i])); } - _probe_phase_profile = - PipelineXLocalState::profile()->create_child("ProbePhase", true, true); + _probe_phase_profile = Base::profile()->create_child("ProbePhase", true, true); _probe_timer = ADD_TIMER(_probe_phase_profile, "ProbeTime"); _join_filter_timer = ADD_CHILD_TIMER(_probe_phase_profile, "JoinFilterTimer", "ProbeTime"); _build_output_block_timer = @@ -51,17 +50,16 @@ Status JoinProbeLocalState::init(RuntimeState* state, template Status JoinProbeLocalState::close(RuntimeState* state) { - if (PipelineXLocalState::_closed) { + if (Base::_closed) { return Status::OK(); } _join_block.clear(); - return PipelineXLocalState::close(state); + return Base::close(state); } template void JoinProbeLocalState::_construct_mutable_join_block() { - auto& p = - PipelineXLocalState::_parent->template cast(); + auto& p = Base::_parent->template cast(); const auto& mutable_block_desc = p._intermediate_row_desc; for (const auto tuple_desc : mutable_block_desc->tuple_descriptors()) { for (const auto slot_desc : tuple_desc->slots()) { @@ -79,8 +77,7 @@ void JoinProbeLocalState::_construct_mutable_join_block template Status JoinProbeLocalState::_build_output_block( vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) { - auto& p = - PipelineXLocalState::_parent->template cast(); + auto& p = Base::_parent->template cast(); SCOPED_TIMER(_build_output_block_timer); auto is_mem_reuse = output_block->mem_reuse(); vectorized::MutableBlock mutable_block = @@ -119,7 +116,7 @@ Status JoinProbeLocalState::_build_output_block( } } else { DCHECK(mutable_columns.size() == p.row_desc().num_materialized_slots()); - SCOPED_TIMER(PipelineXLocalState::_projection_timer); + SCOPED_TIMER(Base::_projection_timer); for (int i = 0; i < mutable_columns.size(); ++i) { auto result_column_id = -1; RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id)); @@ -151,8 +148,7 @@ Status JoinProbeLocalState::_build_output_block( template void JoinProbeLocalState::_reset_tuple_is_null_column() { - if (PipelineXLocalState::_parent->template cast() - ._is_outer_join) { + if (Base::_parent->template cast()._is_outer_join) { reinterpret_cast(*_tuple_is_null_left_flag_column).clear(); reinterpret_cast(*_tuple_is_null_right_flag_column).clear(); } @@ -161,7 +157,7 @@ void JoinProbeLocalState::_reset_tuple_is_null_column() template JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs), + : Base(pool, tnode, descs), _join_op(tnode.__isset.hash_join_node ? tnode.hash_join_node.join_op : (tnode.__isset.nested_loop_join_node ? tnode.nested_loop_join_node.join_op @@ -210,7 +206,7 @@ JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const T template Status JoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(OperatorX::init(tnode, state)); + RETURN_IF_ERROR(Base::init(tnode, state)); if (tnode.__isset.hash_join_node || tnode.__isset.nested_loop_join_node) { const auto& output_exprs = tnode.__isset.hash_join_node ? tnode.hash_join_node.srcExprList @@ -227,11 +223,14 @@ Status JoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeS template Status JoinProbeOperatorX::open(doris::RuntimeState* state) { - RETURN_IF_ERROR(OperatorX::open(state)); + RETURN_IF_ERROR(Base::open(state)); return vectorized::VExpr::open(_output_expr_ctxs, state); } -template class JoinProbeLocalState; +template class JoinProbeLocalState; template class JoinProbeOperatorX; +template class JoinProbeLocalState; +template class JoinProbeOperatorX; + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index e39e9a20f549d1..00f476792337f7 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -30,16 +30,20 @@ class JoinProbeOperatorX; template class JoinProbeLocalState : public PipelineXLocalState { public: + using Base = PipelineXLocalState; virtual Status init(RuntimeState* state, LocalStateInfo& info) override; virtual Status close(RuntimeState* state) override; + virtual void add_tuple_is_null_column(vectorized::Block* block) = 0; protected: JoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState(state, parent) {} + : Base(state, parent), + _child_block(vectorized::Block::create_unique()), + _child_source_state(SourceState::DEPEND_ON_SOURCE) {} virtual ~JoinProbeLocalState() = default; void _construct_mutable_join_block(); Status _build_output_block(vectorized::Block* origin_block, vectorized::Block* output_block, - bool keep_origin); + bool keep_origin = true); void _reset_tuple_is_null_column(); // output expr vectorized::VExprContextSPtrs _output_expr_ctxs; @@ -52,11 +56,15 @@ class JoinProbeLocalState : public PipelineXLocalState { RuntimeProfile::Counter* _probe_rows_counter; RuntimeProfile::Counter* _join_filter_timer; RuntimeProfile::Counter* _build_output_block_timer; + + std::unique_ptr _child_block; + SourceState _child_source_state; }; template class JoinProbeOperatorX : public OperatorX { public: + using Base = OperatorX; JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); virtual Status init(const TPlanNode& tnode, RuntimeState* state) override; diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index c49e070ef1d95b..c0c04fdb123456 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -25,4 +25,97 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(NestLoopJoinBuildOperator, StreamingOperator) -} // namespace doris::pipeline \ No newline at end of file +NestedLoopJoinBuildSinkLocalState::NestedLoopJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, + RuntimeState* state) + : JoinBuildSinkLocalState( + parent, state) {} + +Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); + auto& p = _parent->cast(); + _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size()); + for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i])); + } + return Status::OK(); +} + +const std::vector& NestedLoopJoinBuildSinkLocalState::runtime_filter_descs() { + return _parent->cast()._runtime_filter_descs; +} + +NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, + const TPlanNode& tnode, + const DescriptorTbl& descs) + : JoinBuildSinkOperatorX(pool, tnode, descs), + _runtime_filter_descs(tnode.runtime_filters), + _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && + tnode.nested_loop_join_node.is_output_left_side_only), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} + +Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state)); + + std::vector filter_src_exprs; + for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { + filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr); + RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( + _runtime_filter_descs[i], state->query_options())); + } + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs)); + return Status::OK(); +} + +Status NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) { + // pre-compute the tuple index of build tuples in the output row + int num_build_tuples = _child_x->row_desc().tuple_descriptors().size(); + + for (int i = 0; i < num_build_tuples; ++i) { + TupleDescriptor* build_tuple_desc = _child_x->row_desc().tuple_descriptors()[i]; + auto tuple_idx = _row_descriptor.get_tuple_idx(build_tuple_desc->id()); + RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx); + } + RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child_x->row_desc())); + return Status::OK(); +} + +Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) { + return vectorized::VExpr::open(_filter_src_expr_ctxs, state); +} + +Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* block, + SourceState source_state) { + auto& local_state = + state->get_sink_local_state(id())->cast(); + SCOPED_TIMER(local_state._build_timer); + auto rows = block->rows(); + auto mem_usage = block->allocated_bytes(); + + if (rows != 0) { + local_state._build_rows += rows; + local_state._total_mem_usage += mem_usage; + local_state._shared_state->build_blocks.emplace_back(std::move(*block)); + if (_match_all_build || _is_right_semi_anti) { + local_state._shared_state->build_side_visited_flags.emplace_back( + vectorized::ColumnUInt8::create(rows, 0)); + } + } + + if (source_state == SourceState::FINISHED) { + COUNTER_UPDATE(local_state._build_rows_counter, local_state._build_rows); + vectorized::RuntimeFilterBuild rf_ctx(&local_state); + RETURN_IF_ERROR(rf_ctx(state)); + + // optimize `in bitmap`, see https://github.com/apache/doris/issues/14338 + if (_is_output_left_side_only && ((_join_op == TJoinOp::type::LEFT_SEMI_JOIN && + local_state._shared_state->build_blocks.empty()) || + (_join_op == TJoinOp::type::LEFT_ANTI_JOIN && + !local_state._shared_state->build_blocks.empty()))) { + local_state._shared_state->left_side_eos = true; + } + } + + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 33918f0a0c8952..5c4ce864e3422c 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -20,6 +20,8 @@ #include #include "operator.h" +#include "pipeline/exec/join_build_sink_operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/exec/join/vnested_loop_join_node.h" namespace doris { @@ -42,5 +44,63 @@ class NestLoopJoinBuildOperator final : public StreamingOperator { +public: + ENABLE_FACTORY_CREATOR(NestedLoopJoinBuildSinkLocalState); + using Parent = NestedLoopJoinBuildSinkOperatorX; + NestedLoopJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); + ~NestedLoopJoinBuildSinkLocalState() = default; + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + + const std::vector& runtime_filter_descs(); + vectorized::VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } + RuntimeProfile::Counter* push_compute_timer() { return _push_compute_timer; } + vectorized::Blocks& build_blocks() { return _shared_state->build_blocks; } + RuntimeProfile::Counter* push_down_timer() { return _push_down_timer; } + +private: + friend class NestedLoopJoinBuildSinkOperatorX; + uint64_t _build_rows = 0; + uint64_t _total_mem_usage = 0; + + vectorized::VExprContextSPtrs _filter_src_expr_ctxs; +}; + +class NestedLoopJoinBuildSinkOperatorX final + : public JoinBuildSinkOperatorX { +public: + NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + Status init(const TDataSink& tsink) override { + return Status::InternalError( + "{} should not init with TDataSink", + JoinBuildSinkOperatorX::_name); + } + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + + virtual bool can_write(RuntimeState* state) override { return true; } + +private: + friend class NestedLoopJoinBuildSinkLocalState; + + vectorized::VExprContextSPtrs _filter_src_expr_ctxs; + + const std::vector _runtime_filter_descs; + const bool _is_output_left_side_only; + RowDescriptor _row_descriptor; +}; + } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index cdc363043301dd..0665a9e6f9a496 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -42,4 +42,556 @@ Status NestLoopJoinProbeOperator::close(doris::RuntimeState* state) { return StatefulOperator::close(state); } +NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state, + OperatorXBase* parent) + : JoinProbeLocalState(state, + parent), + _matched_rows_done(false), + _left_block_pos(0) {} + +Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); + auto& p = _parent->cast(); + _join_conjuncts.resize(p._join_conjuncts.size()); + for (size_t i = 0; i < _join_conjuncts.size(); i++) { + RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state, _join_conjuncts[i])); + } + _construct_mutable_join_block(); + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) { + _left_block.clear(); + + vectorized::Blocks tmp_build_blocks; + _shared_state->build_blocks.swap(tmp_build_blocks); + + vectorized::MutableColumns tmp_build_side_visited_flags; + _shared_state->build_side_visited_flags.swap(tmp_build_side_visited_flags); + + _tuple_is_null_left_flag_column = nullptr; + _tuple_is_null_right_flag_column = nullptr; + return JoinProbeLocalState::close( + state); +} + +void NestedLoopJoinProbeLocalState::_update_additional_flags(vectorized::Block* block) { + auto& p = _parent->cast(); + if (p._is_outer_join) { + auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); + auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); + auto& left_null_map = reinterpret_cast(*p0); + auto& right_null_map = reinterpret_cast(*p1); + auto left_size = left_null_map.size(); + auto right_size = right_null_map.size(); + + if (left_size < block->rows()) { + left_null_map.get_data().resize_fill(block->rows(), 0); + } + if (right_size < block->rows()) { + right_null_map.get_data().resize_fill(block->rows(), 0); + } + } + if (p._is_mark_join) { + vectorized::IColumn::Filter& mark_data = + assert_cast&>( + *block->get_by_position(block->columns() - 1).column->assume_mutable()) + .get_data(); + if (mark_data.size() < block->rows()) { + mark_data.resize_fill(block->rows(), 1); + } + } +} + +void NestedLoopJoinProbeLocalState::_reset_with_next_probe_row() { + // TODO: need a vector of left block to register the _probe_row_visited_flags + _current_build_pos = 0; + _left_block_pos++; +} + +void NestedLoopJoinProbeLocalState::add_tuple_is_null_column(vectorized::Block* block) { + auto& p = _parent->cast(); + if (p._is_outer_join) { + auto p0 = _tuple_is_null_left_flag_column->assume_mutable(); + auto p1 = _tuple_is_null_right_flag_column->assume_mutable(); + block->insert({std::move(p0), std::make_shared(), + "left_tuples_is_null"}); + block->insert({std::move(p1), std::make_shared(), + "right_tuples_is_null"}); + } +} + +template +Status NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* state, + JoinOpType& join_op_variants) { + auto& p = _parent->cast(); + constexpr bool ignore_null = JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN; + _left_block_start_pos = _left_block_pos; + _left_side_process_count = 0; + DCHECK(!_need_more_input_data || !_matched_rows_done); + + vectorized::MutableBlock mutable_join_block(&_join_block); + if (!_matched_rows_done && !_need_more_input_data) { + // We should try to join rows if there still are some rows from probe side. + while (_join_block.rows() < state->batch_size()) { + while (_current_build_pos == _shared_state->build_blocks.size() || + _left_block_pos == _left_block.rows()) { + // if left block is empty(), do not need disprocess the left block rows + if (_left_block.rows() > _left_block_pos) { + _left_side_process_count++; + } + + _reset_with_next_probe_row(); + if (_left_block_pos < _left_block.rows()) { + if constexpr (set_probe_side_flag) { + _probe_offset_stack.push(mutable_join_block.rows()); + } + } else { + if (_shared_state->left_side_eos) { + _matched_rows_done = true; + } else { + _need_more_input_data = true; + } + break; + } + } + + // Do not have left row need to be disposed + if (_matched_rows_done || _need_more_input_data) { + break; + } + + const auto& now_process_build_block = _shared_state->build_blocks[_current_build_pos++]; + if constexpr (set_build_side_flag) { + _build_offset_stack.push(mutable_join_block.rows()); + } + _process_left_child_block(mutable_join_block, now_process_build_block); + } + + if constexpr (set_probe_side_flag) { + Status status; + RETURN_IF_CATCH_EXCEPTION( + (status = _do_filtering_and_update_visited_flags< + set_build_side_flag, set_probe_side_flag, ignore_null>( + &_join_block, !p._is_left_semi_anti))); + _update_additional_flags(&_join_block); + if (!status.ok()) { + return status; + } + mutable_join_block = vectorized::MutableBlock(&_join_block); + // If this join operation is left outer join or full outer join, when + // `_left_side_process_count`, means all rows from build + // side have been joined with _left_side_process_count, we should output current + // probe row with null from build side. + if (_left_side_process_count) { + _finalize_current_phase( + mutable_join_block, state->batch_size()); + } + } + + if (_left_side_process_count) { + if (p._is_mark_join && _shared_state->build_blocks.empty()) { + DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN); + _append_left_data_with_null(mutable_join_block); + } + } + } + + if constexpr (!set_probe_side_flag) { + Status status; + RETURN_IF_CATCH_EXCEPTION( + (status = _do_filtering_and_update_visited_flags( + &_join_block, !p._is_right_semi_anti))); + _update_additional_flags(&_join_block); + mutable_join_block = vectorized::MutableBlock(&_join_block); + if (!status.ok()) { + return status; + } + } + + if constexpr (set_build_side_flag) { + if (_matched_rows_done && + _output_null_idx_build_side < _shared_state->build_blocks.size()) { + _finalize_current_phase( + mutable_join_block, state->batch_size()); + } + } + return Status::OK(); +} + +void NestedLoopJoinProbeLocalState::_resize_fill_tuple_is_null_column(size_t new_size, + int left_flag, + int right_flag) { + auto& p = _parent->cast(); + if (p._is_outer_join) { + reinterpret_cast(_tuple_is_null_left_flag_column.get()) + ->get_data() + .resize_fill(new_size, left_flag); + reinterpret_cast(_tuple_is_null_right_flag_column.get()) + ->get_data() + .resize_fill(new_size, right_flag); + } +} + +template +void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableBlock& mutable_block, + size_t batch_size) { + auto& p = _parent->cast(); + auto& dst_columns = mutable_block.mutable_columns(); + DCHECK_GT(dst_columns.size(), 0); + auto column_size = dst_columns[0]->size(); + if constexpr (BuildSide) { + DCHECK(!p._is_mark_join); + auto build_block_sz = _shared_state->build_blocks.size(); + size_t i = _output_null_idx_build_side; + for (; i < build_block_sz and column_size < batch_size; i++) { + const auto& cur_block = _shared_state->build_blocks[i]; + const auto* __restrict cur_visited_flags = + assert_cast( + _shared_state->build_side_visited_flags[i].get()) + ->get_data() + .data(); + const auto num_rows = cur_block.rows(); + + std::vector selector(num_rows); + size_t selector_idx = 0; + for (size_t j = 0; j < num_rows; j++) { + if constexpr (IsSemi) { + if (cur_visited_flags[j]) { + selector[selector_idx++] = j; + } + } else { + if (!cur_visited_flags[j]) { + selector[selector_idx++] = j; + } + } + } + + column_size += selector_idx; + for (size_t j = 0; j < p._num_probe_side_columns; ++j) { + DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN || + p._join_op == TJoinOp::FULL_OUTER_JOIN || + p._join_op == TJoinOp::RIGHT_ANTI_JOIN || + p._join_op == TJoinOp::RIGHT_SEMI_JOIN); + dst_columns[j]->insert_many_defaults(selector_idx); + } + for (size_t j = 0; j < p._num_build_side_columns; ++j) { + auto src_column = cur_block.get_by_position(j); + if (!src_column.column->is_nullable() && + dst_columns[p._num_probe_side_columns + j]->is_nullable()) { + DCHECK(p._join_op == TJoinOp::FULL_OUTER_JOIN); + assert_cast( + dst_columns[p._num_probe_side_columns + j].get()) + ->get_nested_column_ptr() + ->insert_indices_from(*src_column.column, selector.data(), + selector.data() + selector_idx); + assert_cast( + dst_columns[p._num_probe_side_columns + j].get()) + ->get_null_map_column() + .get_data() + .resize_fill(column_size, 0); + } else { + dst_columns[p._num_probe_side_columns + j]->insert_indices_from( + *src_column.column.get(), selector.data(), + selector.data() + selector_idx); + } + } + _resize_fill_tuple_is_null_column(column_size, 1, 0); + } + _output_null_idx_build_side = i; + } else { + if (!p._is_mark_join) { + auto new_size = column_size; + DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows()); + for (int j = _left_block_start_pos; + j < _left_block_start_pos + _left_side_process_count; ++j) { + if (_cur_probe_row_visited_flags[j] == IsSemi) { + new_size++; + for (size_t i = 0; i < p._num_probe_side_columns; ++i) { + const vectorized::ColumnWithTypeAndName src_column = + _left_block.get_by_position(i); + if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { + DCHECK(p._join_op == TJoinOp::FULL_OUTER_JOIN); + assert_cast(dst_columns[i].get()) + ->get_nested_column_ptr() + ->insert_many_from(*src_column.column, j, 1); + assert_cast(dst_columns[i].get()) + ->get_null_map_column() + .get_data() + .resize_fill(new_size, 0); + } else { + dst_columns[i]->insert_many_from(*src_column.column, j, 1); + } + } + } + } + if (new_size > column_size) { + for (size_t i = 0; i < p._num_build_side_columns; ++i) { + dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(new_size - + column_size); + } + _resize_fill_tuple_is_null_column(new_size, 0, 1); + } + } else { + vectorized::IColumn::Filter& mark_data = + assert_cast&>( + *dst_columns[dst_columns.size() - 1]) + .get_data(); + mark_data.reserve(mark_data.size() + _left_side_process_count); + DCHECK_LT(_left_block_pos, _left_block.rows()); + for (int j = _left_block_start_pos; + j < _left_block_start_pos + _left_side_process_count; ++j) { + mark_data.emplace_back(IsSemi != _cur_probe_row_visited_flags[j]); + for (size_t i = 0; i < p._num_probe_side_columns; ++i) { + const vectorized::ColumnWithTypeAndName src_column = + _left_block.get_by_position(i); + DCHECK(p._join_op != TJoinOp::FULL_OUTER_JOIN); + dst_columns[i]->insert_from(*src_column.column, j); + } + } + } + } +} + +void NestedLoopJoinProbeLocalState::_append_left_data_with_null( + vectorized::MutableBlock& mutable_block) const { + auto& p = _parent->cast(); + auto& dst_columns = mutable_block.mutable_columns(); + DCHECK(p._is_mark_join); + for (size_t i = 0; i < p._num_probe_side_columns; ++i) { + const vectorized::ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { + auto origin_sz = dst_columns[i]->size(); + DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN || + p._join_op == TJoinOp::FULL_OUTER_JOIN); + assert_cast(dst_columns[i].get()) + ->get_nested_column_ptr() + ->insert_range_from(*src_column.column, _left_block_start_pos, + _left_side_process_count); + assert_cast(dst_columns[i].get()) + ->get_null_map_column() + .get_data() + .resize_fill(origin_sz + 1, 0); + } else { + dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos, + _left_side_process_count); + } + } + for (size_t i = 0; i < p._num_build_side_columns; ++i) { + dst_columns[p._num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count); + } + vectorized::IColumn::Filter& mark_data = + assert_cast&>( + *dst_columns[dst_columns.size() - 1]) + .get_data(); + mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0); +} + +void NestedLoopJoinProbeLocalState::_process_left_child_block( + vectorized::MutableBlock& mutable_block, + const vectorized::Block& now_process_build_block) const { + auto& p = _parent->cast(); + auto& dst_columns = mutable_block.mutable_columns(); + const int max_added_rows = now_process_build_block.rows(); + for (size_t i = 0; i < p._num_probe_side_columns; ++i) { + const vectorized::ColumnWithTypeAndName& src_column = _left_block.get_by_position(i); + if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) { + auto origin_sz = dst_columns[i]->size(); + DCHECK(p._join_op == TJoinOp::RIGHT_OUTER_JOIN || + p._join_op == TJoinOp::FULL_OUTER_JOIN); + assert_cast(dst_columns[i].get()) + ->get_nested_column_ptr() + ->insert_many_from(*src_column.column, _left_block_pos, max_added_rows); + assert_cast(dst_columns[i].get()) + ->get_null_map_column() + .get_data() + .resize_fill(origin_sz + max_added_rows, 0); + } else { + dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, max_added_rows); + } + } + for (size_t i = 0; i < p._num_build_side_columns; ++i) { + const vectorized::ColumnWithTypeAndName& src_column = + now_process_build_block.get_by_position(i); + if (!src_column.column->is_nullable() && + dst_columns[p._num_probe_side_columns + i]->is_nullable()) { + auto origin_sz = dst_columns[p._num_probe_side_columns + i]->size(); + DCHECK(p._join_op == TJoinOp::LEFT_OUTER_JOIN || + p._join_op == TJoinOp::FULL_OUTER_JOIN); + assert_cast( + dst_columns[p._num_probe_side_columns + i].get()) + ->get_nested_column_ptr() + ->insert_range_from(*src_column.column.get(), 0, max_added_rows); + assert_cast( + dst_columns[p._num_probe_side_columns + i].get()) + ->get_null_map_column() + .get_data() + .resize_fill(origin_sz + max_added_rows, 0); + } else { + dst_columns[p._num_probe_side_columns + i]->insert_range_from(*src_column.column.get(), + 0, max_added_rows); + } + } +} + +NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : JoinProbeOperatorX(pool, tnode, descs), + _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && + tnode.nested_loop_join_node.is_output_left_side_only) {} + +Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX::init(tnode, state)); + + if (tnode.nested_loop_join_node.__isset.join_conjuncts && + !tnode.nested_loop_join_node.join_conjuncts.empty()) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( + tnode.nested_loop_join_node.join_conjuncts, _join_conjuncts)); + } else if (tnode.nested_loop_join_node.__isset.vjoin_conjunct) { + vectorized::VExprContextSPtr context; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( + tnode.nested_loop_join_node.vjoin_conjunct, context)); + _join_conjuncts.emplace_back(context); + } + + return Status::OK(); +} + +Status NestedLoopJoinProbeOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); + for (auto& conjunct : _join_conjuncts) { + RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); + } + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); + _num_probe_side_columns = _child_x->row_desc().num_materialized_slots(); + _num_build_side_columns = _build_side_child->row_desc().num_materialized_slots(); + return Status::OK(); +} + +Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeOperatorX::open(state)); + return vectorized::VExpr::open(_join_conjuncts, state); +} + +bool NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const { + auto& local_state = state->get_local_state(id())->cast(); + return local_state._need_more_input_data and !local_state._shared_state->left_side_eos and + local_state._join_block.rows() == 0; +} + +Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized::Block* block, + SourceState source_state) { + auto& local_state = state->get_local_state(id())->cast(); + COUNTER_UPDATE(local_state._probe_rows_counter, block->rows()); + local_state._cur_probe_row_visited_flags.resize(block->rows()); + std::fill(local_state._cur_probe_row_visited_flags.begin(), + local_state._cur_probe_row_visited_flags.end(), 0); + local_state._left_block_pos = 0; + local_state._need_more_input_data = false; + local_state._shared_state->left_side_eos = source_state == SourceState::FINISHED; + + if (!_is_output_left_side_only) { + auto func = [&](auto&& join_op_variants, auto set_build_side_flag, + auto set_probe_side_flag) { + return local_state.generate_join_block_data, + set_build_side_flag, set_probe_side_flag>( + state, join_op_variants); + }; + RETURN_IF_ERROR( + std::visit(func, local_state._shared_state->join_op_variants, + vectorized::make_bool_variant(_match_all_build || _is_right_semi_anti), + vectorized::make_bool_variant(_match_all_probe || _is_left_semi_anti))); + } + return Status::OK(); +} + +Status NestedLoopJoinProbeOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + auto& local_state = state->get_local_state(id())->cast(); + if (need_more_input_data(state)) { + local_state._child_block->clear_column_data(); + RETURN_IF_ERROR(_child_x->get_block(state, local_state._child_block.get(), + local_state._child_source_state)); + source_state = local_state._child_source_state; + if (local_state._child_block->rows() == 0 && + local_state._child_source_state != SourceState::FINISHED) { + return Status::OK(); + } + RETURN_IF_ERROR( + push(state, local_state._child_block.get(), local_state._child_source_state)); + } + + if (!need_more_input_data(state)) { + RETURN_IF_ERROR(pull(state, block, source_state)); + if (source_state != SourceState::FINISHED && !need_more_input_data(state)) { + source_state = SourceState::MORE_DATA; + } else if (source_state != SourceState::FINISHED && + source_state == SourceState::MORE_DATA) { + source_state = local_state._child_source_state; + } + } + return Status::OK(); +} + +Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + auto& local_state = state->get_local_state(id())->cast(); + if (_is_output_left_side_only) { + RETURN_IF_ERROR(local_state._build_output_block(&local_state._left_block, block)); + source_state = + local_state._shared_state->left_side_eos ? SourceState::FINISHED : source_state; + local_state._need_more_input_data = !local_state._shared_state->left_side_eos; + } else { + source_state = ((_match_all_build || _is_right_semi_anti) + ? local_state._output_null_idx_build_side == + local_state._shared_state->build_blocks.size() && + local_state._matched_rows_done + : local_state._matched_rows_done) + ? SourceState::FINISHED + : source_state; + + { + vectorized::Block tmp_block = local_state._join_block; + + // Here make _join_block release the columns' ptr + local_state._join_block.set_columns(local_state._join_block.clone_empty_columns()); + + local_state.add_tuple_is_null_column(&tmp_block); + { + SCOPED_TIMER(local_state._join_filter_timer); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block( + local_state._conjuncts, &tmp_block, tmp_block.columns())); + } + RETURN_IF_ERROR(local_state._build_output_block(&tmp_block, block, false)); + local_state._reset_tuple_is_null_column(); + } + local_state._join_block.clear_column_data(); + + if (!(source_state == SourceState::FINISHED) and !local_state._need_more_input_data) { + auto func = [&](auto&& join_op_variants, auto set_build_side_flag, + auto set_probe_side_flag) { + return local_state + .generate_join_block_data, + set_build_side_flag, set_probe_side_flag>( + state, join_op_variants); + }; + RETURN_IF_ERROR(std::visit( + func, local_state._shared_state->join_op_variants, + vectorized::make_bool_variant(_match_all_build || _is_right_semi_anti), + vectorized::make_bool_variant(_match_all_probe || _is_left_semi_anti))); + } + } + + local_state.reached_limit(block, source_state); + return Status::OK(); +} + +bool NestedLoopJoinProbeOperatorX::can_read(RuntimeState* state) { + auto& local_state = state->get_local_state(id())->cast(); + return local_state._dependency->done(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 8fe9b1f883e7ef..b039eaaaf65617 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -21,6 +21,9 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/exec/join_probe_operator.h" +#include "pipeline/pipeline_x/operator.h" +#include "util/simd/bits.h" #include "vec/exec/join/vnested_loop_join_node.h" namespace doris { @@ -48,5 +51,185 @@ class NestLoopJoinProbeOperator final : public StatefulOperator { +public: + using Parent = NestedLoopJoinProbeOperatorX; + ENABLE_FACTORY_CREATOR(NestedLoopJoinProbeLocalState); + NestedLoopJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent); + ~NestedLoopJoinProbeLocalState() = default; + + void add_tuple_is_null_column(vectorized::Block* block) override; +#define CLEAR_BLOCK \ + for (size_t i = 0; i < column_to_keep; ++i) { \ + block->get_by_position(i).column->assume_mutable()->clear(); \ + } + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; + template + Status generate_join_block_data(RuntimeState* state, JoinOpType& join_op_variants); + +private: + friend class NestedLoopJoinProbeOperatorX; + void _update_additional_flags(vectorized::Block* block); + template + void _finalize_current_phase(vectorized::MutableBlock& mutable_block, size_t batch_size); + void _resize_fill_tuple_is_null_column(size_t new_size, int left_flag, int right_flag); + void _reset_with_next_probe_row(); + void _append_left_data_with_null(vectorized::MutableBlock& mutable_block) const; + void _process_left_child_block(vectorized::MutableBlock& mutable_block, + const vectorized::Block& now_process_build_block) const; + template + void _do_filtering_and_update_visited_flags_impl(vectorized::Block* block, int column_to_keep, + int build_block_idx, int processed_blocks_num, + bool materialize, Filter& filter) { + if constexpr (SetBuildSideFlag) { + for (size_t i = 0; i < processed_blocks_num; i++) { + auto& build_side_flag = + assert_cast( + _shared_state->build_side_visited_flags[build_block_idx].get()) + ->get_data(); + auto* __restrict build_side_flag_data = build_side_flag.data(); + auto cur_sz = build_side_flag.size(); + const size_t offset = _build_offset_stack.top(); + _build_offset_stack.pop(); + for (size_t j = 0; j < cur_sz; j++) { + build_side_flag_data[j] |= filter[offset + j]; + } + build_block_idx = build_block_idx == 0 ? _shared_state->build_blocks.size() - 1 + : build_block_idx - 1; + } + } + if constexpr (SetProbeSideFlag) { + int end = filter.size(); + for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 + : _left_block_pos; + i >= _left_block_start_pos; i--) { + int offset = 0; + if (!_probe_offset_stack.empty()) { + offset = _probe_offset_stack.top(); + _probe_offset_stack.pop(); + } + if (!_cur_probe_row_visited_flags[i]) { + _cur_probe_row_visited_flags[i] = + simd::contain_byte(filter.data() + offset, end - offset, 1) ? 1 + : 0; + } + end = offset; + } + } + if (materialize) { + vectorized::Block::filter_block_internal(block, filter, column_to_keep); + } else { + CLEAR_BLOCK + } + } + + // need exception safety + template + Status _do_filtering_and_update_visited_flags(vectorized::Block* block, bool materialize) { + auto column_to_keep = block->columns(); + // If we need to set visited flags for build side, + // 1. Execute conjuncts and get a column with bool type to do filtering. + // 2. Use bool column to update build-side visited flags. + // 3. Use bool column to do filtering. + size_t build_block_idx = _current_build_pos == 0 ? _shared_state->build_blocks.size() - 1 + : _current_build_pos - 1; + size_t processed_blocks_num = _build_offset_stack.size(); + if (LIKELY(!_join_conjuncts.empty() && block->rows() > 0)) { + vectorized::IColumn::Filter filter(block->rows(), 1); + bool can_filter_all = false; + RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts( + _join_conjuncts, nullptr, IgnoreNull, block, &filter, &can_filter_all)); + + if (can_filter_all) { + CLEAR_BLOCK + std::stack empty1; + _probe_offset_stack.swap(empty1); + + std::stack empty2; + _build_offset_stack.swap(empty2); + } else { + _do_filtering_and_update_visited_flags_impl( + block, column_to_keep, build_block_idx, processed_blocks_num, materialize, + filter); + } + } else if (block->rows() > 0) { + if constexpr (SetBuildSideFlag) { + for (size_t i = 0; i < processed_blocks_num; i++) { + auto& build_side_flag = + assert_cast( + _shared_state->build_side_visited_flags[build_block_idx].get()) + ->get_data(); + auto* __restrict build_side_flag_data = build_side_flag.data(); + auto cur_sz = build_side_flag.size(); + _build_offset_stack.pop(); + memset(reinterpret_cast(build_side_flag_data), 1, cur_sz); + build_block_idx = build_block_idx == 0 ? _shared_state->build_blocks.size() - 1 + : build_block_idx - 1; + } + } + if constexpr (SetProbeSideFlag) { + std::stack empty; + _probe_offset_stack.swap(empty); + std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), + 1); + } + if (!materialize) { + CLEAR_BLOCK + } + } + vectorized::Block::erase_useless_column(block, column_to_keep); + return Status::OK(); + } + + bool _matched_rows_done; + int _left_block_start_pos = 0; + int _left_block_pos; // current scan pos in _left_block + int _left_side_process_count = 0; + bool _need_more_input_data = true; + // Visited flags for current row in probe side. + std::vector _cur_probe_row_visited_flags; + size_t _current_build_pos = 0; + // _left_block must be cleared before calling get_next(). The child node + // does not initialize all tuple ptrs in the row, only the ones that it + // is responsible for. + vectorized::Block _left_block; + vectorized::MutableColumns _dst_columns; + std::stack _build_offset_stack; + std::stack _probe_offset_stack; + uint64_t _output_null_idx_build_side = 0; + vectorized::VExprContextSPtrs _join_conjuncts; +}; + +class NestedLoopJoinProbeOperatorX final + : public JoinProbeOperatorX { +public: + NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs); + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + bool can_read(RuntimeState* state) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state); + Status pull(doris::RuntimeState* state, vectorized::Block* output_block, + SourceState& source_state); + + bool need_more_input_data(RuntimeState* state) const; + +private: + friend class NestedLoopJoinProbeLocalState; + bool _is_output_left_side_only; + vectorized::VExprContextSPtrs _join_conjuncts; + size_t _num_probe_side_columns = 0; + size_t _num_build_side_columns = 0; +}; + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index d248ccb29bf4d8..cf78fc5f2b2eb4 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -21,13 +21,13 @@ namespace doris::pipeline { -template Status JoinDependency::extract_join_column( +template Status HashJoinDependency::extract_join_column( vectorized::Block&, COW::mutable_ptr>&, std::vector>&, std::vector> const&); -template Status JoinDependency::extract_join_column( +template Status HashJoinDependency::extract_join_column( vectorized::Block&, COW::mutable_ptr>&, std::vector>&, @@ -229,9 +229,10 @@ bool AnalyticDependency::whether_need_next_partition(vectorized::BlockRowPos fou return false; } -Status JoinDependency::do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs, - RuntimeProfile::Counter& expr_call_timer, - std::vector& res_col_ids) { +Status HashJoinDependency::do_evaluate(vectorized::Block& block, + vectorized::VExprContextSPtrs& exprs, + RuntimeProfile::Counter& expr_call_timer, + std::vector& res_col_ids) { for (size_t i = 0; i < exprs.size(); ++i) { int result_col_id = -1; // execute build column @@ -248,7 +249,7 @@ Status JoinDependency::do_evaluate(vectorized::Block& block, vectorized::VExprCo return Status::OK(); } -std::vector JoinDependency::convert_block_to_null(vectorized::Block& block) { +std::vector HashJoinDependency::convert_block_to_null(vectorized::Block& block) { std::vector results; for (int i = 0; i < block.columns(); ++i) { if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) { @@ -262,10 +263,10 @@ std::vector JoinDependency::convert_block_to_null(vectorized::Block& b } template -Status JoinDependency::extract_join_column(vectorized::Block& block, - vectorized::ColumnUInt8::MutablePtr& null_map, - vectorized::ColumnRawPtrs& raw_ptrs, - const std::vector& res_col_ids) { +Status HashJoinDependency::extract_join_column(vectorized::Block& block, + vectorized::ColumnUInt8::MutablePtr& null_map, + vectorized::ColumnRawPtrs& raw_ptrs, + const std::vector& res_col_ids) { for (size_t i = 0; i < _join_state.build_exprs_size; ++i) { if (_join_state.is_null_safe_eq_join[i]) { raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get(); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 706f62657606df..6fc8544899debb 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -197,34 +197,37 @@ class AnalyticDependency final : public Dependency { }; struct JoinSharedState { - // mark the join column whether support null eq - std::vector is_null_safe_eq_join; - // mark the build hash table whether it needs to store null value - std::vector store_null_in_hash_table; // For some join case, we can apply a short circuit strategy // 1. _short_circuit_for_null_in_probe_side = true // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti bool short_circuit_for_probe = false; + vectorized::JoinOpVariants join_op_variants; +}; + +struct HashJoinSharedState : public JoinSharedState { + // mark the join column whether support null eq + std::vector is_null_safe_eq_join; + // mark the build hash table whether it needs to store null value + std::vector store_null_in_hash_table; std::shared_ptr arena = std::make_shared(); // maybe share hash table with other fragment instances std::shared_ptr hash_table_variants = std::make_shared(); - vectorized::JoinOpVariants join_op_variants; // for full/right outer join vectorized::HashTableIteratorVariants outer_join_pull_visited_iter; vectorized::HashTableIteratorVariants probe_row_match_iter; - std::shared_ptr> build_blocks; vectorized::Sizes probe_key_sz; const std::vector build_side_child_desc; size_t build_exprs_size = 0; + std::shared_ptr> build_blocks; }; -class JoinDependency final : public Dependency { +class HashJoinDependency final : public Dependency { public: - using SharedState = JoinSharedState; - JoinDependency(int id) : Dependency(id) {} - ~JoinDependency() override = default; + using SharedState = HashJoinSharedState; + HashJoinDependency(int id) : Dependency(id) {} + ~HashJoinDependency() override = default; void* shared_state() override { return (void*)&_join_state; } @@ -240,7 +243,28 @@ class JoinDependency final : public Dependency { const std::vector& res_col_ids); private: - JoinSharedState _join_state; + HashJoinSharedState _join_state; +}; + +struct NestedLoopJoinSharedState : public JoinSharedState { + // if true, left child has no more rows to process + bool left_side_eos = false; + // Visited flags for each row in build side. + vectorized::MutableColumns build_side_visited_flags; + // List of build blocks, constructed in prepare() + vectorized::Blocks build_blocks; +}; + +class NestedLoopJoinDependency final : public Dependency { +public: + using SharedState = NestedLoopJoinSharedState; + NestedLoopJoinDependency(int id) : Dependency(id) {} + ~NestedLoopJoinDependency() override = default; + + void* shared_state() override { return (void*)&_join_state; } + +private: + NestedLoopJoinSharedState _join_state; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 110acff33f689c..eb333df533fc0f 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -25,6 +25,8 @@ #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/nested_loop_join_build_operator.h" +#include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/sort_sink_operator.h" @@ -218,6 +220,7 @@ DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(BlockingAggSinkLocalState) DECLARE_OPERATOR_X(StreamingAggSinkLocalState) DECLARE_OPERATOR_X(ExchangeSinkLocalState) +DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState) #undef DECLARE_OPERATOR_X @@ -228,6 +231,7 @@ DECLARE_OPERATOR_X(AnalyticLocalState) DECLARE_OPERATOR_X(SortLocalState) DECLARE_OPERATOR_X(AggLocalState) DECLARE_OPERATOR_X(ExchangeLocalState) +DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) #undef DECLARE_OPERATOR_X diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index cdb8e689f59692..c9ccd783c2bf67 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -51,6 +51,8 @@ #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/nested_loop_join_build_operator.h" +#include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/scan_operator.h" @@ -568,6 +570,24 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _build_side_pipelines.insert({sink->id(), build_side_pipe}); break; } + case TPlanNodeType::CROSS_JOIN_NODE: { + op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + PipelinePtr build_side_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); + + DataSinkOperatorXPtr sink; + sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); + RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); + _build_side_pipelines.insert({sink->id(), build_side_pipe}); + break; + } case TPlanNodeType::SORT_NODE: { op.reset(new SortSourceOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index a3b6c30c9fb4dd..82fecf5793b3dd 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -32,6 +32,8 @@ #include #include +#include "pipeline/exec/nested_loop_join_build_operator.h" + // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" @@ -64,35 +66,32 @@ class ObjectPool; namespace doris::vectorized { -struct RuntimeFilterBuild { - RuntimeFilterBuild(VNestedLoopJoinNode* join_node) : _join_node(join_node) {} - - Status operator()(RuntimeState* state) { - if (_join_node->_runtime_filter_descs.empty()) { - return Status::OK(); - } - VRuntimeFilterSlotsCross runtime_filter_slots(_join_node->_runtime_filter_descs, - _join_node->_filter_src_expr_ctxs); +template +Status RuntimeFilterBuild::operator()(RuntimeState* state) { + if (_parent->runtime_filter_descs().empty()) { + return Status::OK(); + } + VRuntimeFilterSlotsCross runtime_filter_slots(_parent->runtime_filter_descs(), + _parent->filter_src_expr_ctxs()); - RETURN_IF_ERROR(runtime_filter_slots.init(state)); + RETURN_IF_ERROR(runtime_filter_slots.init(state)); - if (!runtime_filter_slots.empty() && !_join_node->_build_blocks.empty()) { - SCOPED_TIMER(_join_node->_push_compute_timer); - for (auto& build_block : _join_node->_build_blocks) { - runtime_filter_slots.insert(&build_block); - } - } - { - SCOPED_TIMER(_join_node->_push_down_timer); - RETURN_IF_ERROR(runtime_filter_slots.publish()); + if (!runtime_filter_slots.empty() && !_parent->build_blocks().empty()) { + SCOPED_TIMER(_parent->push_compute_timer()); + for (auto& build_block : _parent->build_blocks()) { + runtime_filter_slots.insert(&build_block); } - - return Status::OK(); } + { + SCOPED_TIMER(_parent->push_down_timer()); + RETURN_IF_ERROR(runtime_filter_slots.publish()); + } + + return Status::OK(); +} -private: - VNestedLoopJoinNode* _join_node; -}; +template struct RuntimeFilterBuild; +template struct RuntimeFilterBuild; VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -213,7 +212,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block* if (eos) { COUNTER_UPDATE(_build_rows_counter, _build_rows); - RuntimeFilterBuild(this)(state); + RuntimeFilterBuild(this)(state); // optimize `in bitmap`, see https://github.com/apache/doris/issues/14338 if (_is_output_left_side_only && diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h index 03676629bcc4eb..6194ebaf340272 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.h +++ b/be/src/vec/exec/join/vnested_loop_join_node.h @@ -47,6 +47,15 @@ class VExprContext; namespace doris::vectorized { +template +struct RuntimeFilterBuild { + RuntimeFilterBuild(Parent* parent) : _parent(parent) {} + Status operator()(RuntimeState* state); + +private: + Parent* _parent; +}; + // Node for nested loop joins. class VNestedLoopJoinNode final : public VJoinNodeBase { public: @@ -88,6 +97,12 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { Block* get_left_block() { return &_left_block; } + std::vector& runtime_filter_descs() { return _runtime_filter_descs; } + VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } + RuntimeProfile::Counter* push_compute_timer() { return _push_compute_timer; } + Blocks& build_blocks() { return _build_blocks; } + RuntimeProfile::Counter* push_down_timer() { return _push_down_timer; } + private: template Status _generate_join_block_data(RuntimeState* state, JoinOpType& join_op_variants) { @@ -262,6 +277,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase { std::stack _probe_offset_stack; VExprContextSPtrs _join_conjuncts; + template friend struct RuntimeFilterBuild; }; From 6ed150fe1b8473e6fefda4d60d5c7dbf73ddafde Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 1 Sep 2023 16:54:28 +0800 Subject: [PATCH 3/3] update --- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- .../exec/nested_loop_join_build_operator.cpp | 1 + .../exec/nested_loop_join_build_operator.h | 2 +- .../exec/nested_loop_join_probe_operator.cpp | 3 ++- .../exec/nested_loop_join_probe_operator.h | 10 ++++++++++ be/src/pipeline/exec/scan_operator.h | 2 ++ be/src/vec/exec/scan/scanner_context.cpp | 4 ++++ be/src/vec/exec/scan/scanner_context.h | 2 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 18 ++++++++---------- be/src/vec/exec/scan/vscanner.cpp | 3 ++- 10 files changed, 32 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index be059defa7aa77..4bd1bb200704f4 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -124,7 +124,7 @@ class HashJoinBuildSinkOperatorX final SourceState source_state) override; Status close(RuntimeState* state) override; - virtual bool can_write(RuntimeState* state) override { return true; } + bool can_write(RuntimeState* state) override { return true; } private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index c0c04fdb123456..5bd17ae3b10b7f 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -113,6 +113,7 @@ Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector !local_state._shared_state->build_blocks.empty()))) { local_state._shared_state->left_side_eos = true; } + local_state._dependency->set_done(); } return Status::OK(); diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 5c4ce864e3422c..20c96343194e41 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -90,7 +90,7 @@ class NestedLoopJoinBuildSinkOperatorX final Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - virtual bool can_write(RuntimeState* state) override { return true; } + bool can_write(RuntimeState* state) override { return true; } private: friend class NestedLoopJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 0665a9e6f9a496..888d39dde3a03f 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -441,7 +441,8 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con const DescriptorTbl& descs) : JoinProbeOperatorX(pool, tnode, descs), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && - tnode.nested_loop_join_node.is_output_left_side_only) {} + tnode.nested_loop_join_node.is_output_left_side_only), + _old_version_flag(!tnode.__isset.nested_loop_join_node) {} Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinProbeOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index b039eaaaf65617..65f8f73593da35 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -220,6 +220,15 @@ class NestedLoopJoinProbeOperatorX final Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state); Status pull(doris::RuntimeState* state, vectorized::Block* output_block, SourceState& source_state); + const RowDescriptor& intermediate_row_desc() const override { + return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; + } + + const RowDescriptor& row_desc() override { + return _old_version_flag + ? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor) + : *_output_row_desc; + } bool need_more_input_data(RuntimeState* state) const; @@ -229,6 +238,7 @@ class NestedLoopJoinProbeOperatorX final vectorized::VExprContextSPtrs _join_conjuncts; size_t _num_probe_side_columns = 0; size_t _num_build_side_columns = 0; + const bool _old_version_flag; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 83589a5153fb7d..6db073b098cea4 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -81,6 +81,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt virtual TPushAggOp::type get_push_down_agg_type() = 0; + [[nodiscard]] std::string get_name() { return _parent->get_name(); } + protected: friend class vectorized::ScannerContext; friend class vectorized::VScanner; diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index cbd4ae57be576e..626b7eb5c2e4bb 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -145,6 +145,10 @@ Status ScannerContext::init() { return Status::OK(); } +std::string ScannerContext::parent_name() { + return _parent ? _parent->get_name() : _local_state->get_name(); +} + vectorized::BlockUPtr ScannerContext::get_free_block(bool* has_free_block, bool get_block_not_empty) { vectorized::BlockUPtr block; diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index dd0439e2780faf..af7e8334ab14ac 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -139,7 +139,7 @@ class ScannerContext { void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); } void incr_num_scanner_scheduling(int64_t num) { _scanner_sched_counter->update(num); } - VScanNode* parent() { return _parent; } + std::string parent_name(); virtual bool empty_in_queue(int id); diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index abe6d818af9992..87903e480aa3a5 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -221,9 +221,8 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { ctx->set_status_on_error(s); // debug case failure, to be removed if (ctx->state()->enable_profile()) { - LOG(WARNING) - << "debug case failure " << print_id(ctx->state()->query_id()) - << " " << ctx->parent()->get_name() << ": submit_func error: " << s; + LOG(WARNING) << "debug case failure " << print_id(ctx->state()->query_id()) + << " " << ctx->parent_name() << ": submit_func error: " << s; } break; } @@ -263,7 +262,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { // debug case failure, to be removed if (ctx->state()->enable_profile()) { LOG(WARNING) << "debug case failure " << print_id(ctx->state()->query_id()) - << " " << ctx->parent()->get_name() << ": submit_func error2"; + << " " << ctx->parent_name() << ": submit_func error2"; } break; } @@ -276,7 +275,7 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { // debug case failure, to be removed if (ctx->state()->enable_profile()) { LOG(WARNING) << "debug case failure " << print_id(ctx->state()->query_id()) << " " - << ctx->parent()->get_name() << ": USE_BTHREAD_SCANNER"; + << ctx->parent_name() << ": USE_BTHREAD_SCANNER"; } // Only OlapScanner uses bthread scanner // Todo: Make other scanners support bthread scanner @@ -321,7 +320,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext // debug case failure, to be removed if (ctx->state()->enable_profile()) { LOG(WARNING) << "debug case failure " << print_id(ctx->state()->query_id()) << " " - << ctx->parent()->get_name() << ": ScannerScheduler::_scanner_scan"; + << ctx->parent_name() << ": ScannerScheduler::_scanner_scan"; } SCOPED_ATTACH_TASK(scanner->runtime_state()); #if !defined(USE_BTHREAD_SCANNER) @@ -345,7 +344,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext // debug case failure, to be removed if (ctx->state()->enable_profile()) { LOG(WARNING) << "debug case failure " << print_id(ctx->state()->query_id()) << " " - << ctx->parent()->get_name() + << ctx->parent_name() << ": ScannerScheduler::_scanner_scan scanner->init eos"; } } @@ -358,7 +357,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext // debug case failure, to be removed if (ctx->state()->enable_profile()) { LOG(WARNING) << "debug case failure " << print_id(ctx->state()->query_id()) << " " - << ctx->parent()->get_name() + << ctx->parent_name() << ": ScannerScheduler::_scanner_scan scanner->open eos"; } } @@ -399,8 +398,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext // debug case failure, to be removed if (ctx->state()->enable_profile()) { LOG(WARNING) << "debug case failure " << print_id(ctx->state()->query_id()) << " " - << ctx->parent()->get_name() - << ": ScannerScheduler::_scanner_scan ctx->done"; + << ctx->parent_name() << ": ScannerScheduler::_scanner_scan ctx->done"; } break; } diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 2008f899214da1..fdf588bba14616 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -65,7 +65,8 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { // debug case failure, to be removed if (state->enable_profile()) { LOG(WARNING) << "debug case failure " << print_id(state->query_id()) << " " - << _parent->get_name() << ": VScanner::get_block"; + << (_parent ? _parent->get_name() : _local_state->get_name()) + << ": VScanner::get_block"; } // only empty block should be here DCHECK(block->rows() == 0);