Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilde
class HashJoinBuildSinkOperatorX;

class HashJoinBuildSinkLocalState final
: public JoinBuildSinkLocalState<JoinDependency, HashJoinBuildSinkLocalState> {
: public JoinBuildSinkLocalState<HashJoinDependency, HashJoinBuildSinkLocalState> {
public:
ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
using Parent = HashJoinBuildSinkOperatorX;
Expand Down Expand Up @@ -124,7 +124,7 @@ class HashJoinBuildSinkOperatorX final
SourceState source_state) override;
Status close(RuntimeState* state) override;

virtual bool can_write(RuntimeState* state) override { return true; }
bool can_write(RuntimeState* state) override { return true; }

private:
friend class HashJoinBuildSinkLocalState;
Expand Down
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ namespace pipeline {
OPERATOR_CODE_GENERATOR(HashJoinProbeOperator, StatefulOperator)

HashJoinProbeLocalState::HashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent)
: JoinProbeLocalState<JoinDependency, HashJoinProbeLocalState>(state, parent),
_child_block(vectorized::Block::create_unique()),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {}
: JoinProbeLocalState<HashJoinDependency, HashJoinProbeLocalState>(state, parent) {}

Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
Expand Down Expand Up @@ -96,7 +94,7 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) {
_tuple_is_null_left_flag_column = nullptr;
_tuple_is_null_right_flag_column = nullptr;
_probe_block.clear();
return JoinProbeLocalState<JoinDependency, HashJoinProbeLocalState>::close(state);
return JoinProbeLocalState<HashJoinDependency, HashJoinProbeLocalState>::close(state);
}

bool HashJoinProbeLocalState::_need_probe_null_map(vectorized::Block& block,
Expand Down
7 changes: 2 additions & 5 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class HashJoinProbeOperator final : public StatefulOperator<HashJoinProbeOperato

class HashJoinProbeOperatorX;
class HashJoinProbeLocalState final
: public JoinProbeLocalState<JoinDependency, HashJoinProbeLocalState> {
: public JoinProbeLocalState<HashJoinDependency, HashJoinProbeLocalState> {
public:
using Parent = HashJoinProbeOperatorX;
ENABLE_FACTORY_CREATOR(HashJoinProbeLocalState);
Expand All @@ -59,7 +59,7 @@ class HashJoinProbeLocalState final
Status close(RuntimeState* state) override;

void prepare_for_next();
void add_tuple_is_null_column(vectorized::Block* block);
void add_tuple_is_null_column(vectorized::Block* block) override;
void init_for_probe(RuntimeState* state);

HashJoinProbeOperatorX* join_probe() { return (HashJoinProbeOperatorX*)_parent; }
Expand All @@ -70,9 +70,6 @@ class HashJoinProbeLocalState final
friend class HashJoinProbeOperatorX;
friend struct vectorized::HashJoinProbeContext;

std::unique_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;

int _probe_index = -1;
bool _probe_eos = false;
std::atomic<bool> _probe_inited = false;
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/join_build_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "join_build_sink_operator.h"

#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/pipeline_x/operator.h"

namespace doris::pipeline {
Expand Down Expand Up @@ -117,6 +118,8 @@ void JoinBuildSinkOperatorX<LocalStateType>::_init_join_op() {
}

template class JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>;
template class JoinBuildSinkLocalState<JoinDependency, HashJoinBuildSinkLocalState>;
template class JoinBuildSinkLocalState<HashJoinDependency, HashJoinBuildSinkLocalState>;
template class JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>;
template class JoinBuildSinkLocalState<NestedLoopJoinDependency, NestedLoopJoinBuildSinkLocalState>;

} // namespace doris::pipeline
35 changes: 17 additions & 18 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
#include "join_probe_operator.h"

#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/operator.h"

namespace doris::pipeline {

template <typename DependencyType, typename Derived>
Status JoinProbeLocalState<DependencyType, Derived>::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<DependencyType>::init(state, info));
auto& p =
PipelineXLocalState<DependencyType>::_parent->template cast<typename Derived::Parent>();
RETURN_IF_ERROR(Base::init(state, info));
auto& p = Base::_parent->template cast<typename Derived::Parent>();
// only use in outer join as the bool column to mark for function of `tuple_is_null`
if (p._is_outer_join) {
_tuple_is_null_left_flag_column = vectorized::ColumnUInt8::create();
Expand All @@ -38,8 +38,7 @@ Status JoinProbeLocalState<DependencyType, Derived>::init(RuntimeState* state,
RETURN_IF_ERROR(p._output_expr_ctxs[i]->clone(state, _output_expr_ctxs[i]));
}

_probe_phase_profile =
PipelineXLocalState<DependencyType>::profile()->create_child("ProbePhase", true, true);
_probe_phase_profile = Base::profile()->create_child("ProbePhase", true, true);
_probe_timer = ADD_TIMER(_probe_phase_profile, "ProbeTime");
_join_filter_timer = ADD_CHILD_TIMER(_probe_phase_profile, "JoinFilterTimer", "ProbeTime");
_build_output_block_timer =
Expand All @@ -51,17 +50,16 @@ Status JoinProbeLocalState<DependencyType, Derived>::init(RuntimeState* state,

template <typename DependencyType, typename Derived>
Status JoinProbeLocalState<DependencyType, Derived>::close(RuntimeState* state) {
if (PipelineXLocalState<DependencyType>::_closed) {
if (Base::_closed) {
return Status::OK();
}
_join_block.clear();
return PipelineXLocalState<DependencyType>::close(state);
return Base::close(state);
}

template <typename DependencyType, typename Derived>
void JoinProbeLocalState<DependencyType, Derived>::_construct_mutable_join_block() {
auto& p =
PipelineXLocalState<DependencyType>::_parent->template cast<typename Derived::Parent>();
auto& p = Base::_parent->template cast<typename Derived::Parent>();
const auto& mutable_block_desc = p._intermediate_row_desc;
for (const auto tuple_desc : mutable_block_desc->tuple_descriptors()) {
for (const auto slot_desc : tuple_desc->slots()) {
Expand All @@ -79,8 +77,7 @@ void JoinProbeLocalState<DependencyType, Derived>::_construct_mutable_join_block
template <typename DependencyType, typename Derived>
Status JoinProbeLocalState<DependencyType, Derived>::_build_output_block(
vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) {
auto& p =
PipelineXLocalState<DependencyType>::_parent->template cast<typename Derived::Parent>();
auto& p = Base::_parent->template cast<typename Derived::Parent>();
SCOPED_TIMER(_build_output_block_timer);
auto is_mem_reuse = output_block->mem_reuse();
vectorized::MutableBlock mutable_block =
Expand Down Expand Up @@ -119,7 +116,7 @@ Status JoinProbeLocalState<DependencyType, Derived>::_build_output_block(
}
} else {
DCHECK(mutable_columns.size() == p.row_desc().num_materialized_slots());
SCOPED_TIMER(PipelineXLocalState<DependencyType>::_projection_timer);
SCOPED_TIMER(Base::_projection_timer);
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id));
Expand Down Expand Up @@ -151,8 +148,7 @@ Status JoinProbeLocalState<DependencyType, Derived>::_build_output_block(

template <typename DependencyType, typename Derived>
void JoinProbeLocalState<DependencyType, Derived>::_reset_tuple_is_null_column() {
if (PipelineXLocalState<DependencyType>::_parent->template cast<typename Derived::Parent>()
._is_outer_join) {
if (Base::_parent->template cast<typename Derived::Parent>()._is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear();
reinterpret_cast<vectorized::ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear();
}
Expand All @@ -161,7 +157,7 @@ void JoinProbeLocalState<DependencyType, Derived>::_reset_tuple_is_null_column()
template <typename LocalStateType>
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: OperatorX<LocalStateType>(pool, tnode, descs),
: Base(pool, tnode, descs),
_join_op(tnode.__isset.hash_join_node ? tnode.hash_join_node.join_op
: (tnode.__isset.nested_loop_join_node
? tnode.nested_loop_join_node.join_op
Expand Down Expand Up @@ -210,7 +206,7 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T

template <typename LocalStateType>
Status JoinProbeOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<LocalStateType>::init(tnode, state));
RETURN_IF_ERROR(Base::init(tnode, state));
if (tnode.__isset.hash_join_node || tnode.__isset.nested_loop_join_node) {
const auto& output_exprs = tnode.__isset.hash_join_node
? tnode.hash_join_node.srcExprList
Expand All @@ -227,11 +223,14 @@ Status JoinProbeOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeS

template <typename LocalStateType>
Status JoinProbeOperatorX<LocalStateType>::open(doris::RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<LocalStateType>::open(state));
RETURN_IF_ERROR(Base::open(state));
return vectorized::VExpr::open(_output_expr_ctxs, state);
}

template class JoinProbeLocalState<JoinDependency, HashJoinProbeLocalState>;
template class JoinProbeLocalState<HashJoinDependency, HashJoinProbeLocalState>;
template class JoinProbeOperatorX<HashJoinProbeLocalState>;

template class JoinProbeLocalState<NestedLoopJoinDependency, NestedLoopJoinProbeLocalState>;
template class JoinProbeOperatorX<NestedLoopJoinProbeLocalState>;

} // namespace doris::pipeline
12 changes: 10 additions & 2 deletions be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ class JoinProbeOperatorX;
template <typename DependencyType, typename Derived>
class JoinProbeLocalState : public PipelineXLocalState<DependencyType> {
public:
using Base = PipelineXLocalState<DependencyType>;
virtual Status init(RuntimeState* state, LocalStateInfo& info) override;
virtual Status close(RuntimeState* state) override;
virtual void add_tuple_is_null_column(vectorized::Block* block) = 0;

protected:
JoinProbeLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<DependencyType>(state, parent) {}
: Base(state, parent),
_child_block(vectorized::Block::create_unique()),
_child_source_state(SourceState::DEPEND_ON_SOURCE) {}
virtual ~JoinProbeLocalState() = default;
void _construct_mutable_join_block();
Status _build_output_block(vectorized::Block* origin_block, vectorized::Block* output_block,
bool keep_origin);
bool keep_origin = true);
void _reset_tuple_is_null_column();
// output expr
vectorized::VExprContextSPtrs _output_expr_ctxs;
Expand All @@ -52,11 +56,15 @@ class JoinProbeLocalState : public PipelineXLocalState<DependencyType> {
RuntimeProfile::Counter* _probe_rows_counter;
RuntimeProfile::Counter* _join_filter_timer;
RuntimeProfile::Counter* _build_output_block_timer;

std::unique_ptr<vectorized::Block> _child_block;
SourceState _child_source_state;
};

template <typename LocalStateType>
class JoinProbeOperatorX : public OperatorX<LocalStateType> {
public:
using Base = OperatorX<LocalStateType>;
JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
virtual Status init(const TPlanNode& tnode, RuntimeState* state) override;

Expand Down
96 changes: 95 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,98 @@ namespace doris::pipeline {

OPERATOR_CODE_GENERATOR(NestLoopJoinBuildOperator, StreamingOperator)

} // namespace doris::pipeline
NestedLoopJoinBuildSinkLocalState::NestedLoopJoinBuildSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: JoinBuildSinkLocalState<NestedLoopJoinDependency, NestedLoopJoinBuildSinkLocalState>(
parent, state) {}

Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
_filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i]));
}
return Status::OK();
}

const std::vector<TRuntimeFilterDesc>& NestedLoopJoinBuildSinkLocalState::runtime_filter_descs() {
return _parent->cast<NestedLoopJoinBuildSinkOperatorX>()._runtime_filter_descs;
}

NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool, tnode, descs),
_runtime_filter_descs(tnode.runtime_filters),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only &&
tnode.nested_loop_join_node.is_output_left_side_only),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}

Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::init(tnode, state));

std::vector<TExpr> filter_src_exprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
_runtime_filter_descs[i], state->query_options()));
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs));
return Status::OK();
}

Status NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
// pre-compute the tuple index of build tuples in the output row
int num_build_tuples = _child_x->row_desc().tuple_descriptors().size();

for (int i = 0; i < num_build_tuples; ++i) {
TupleDescriptor* build_tuple_desc = _child_x->row_desc().tuple_descriptors()[i];
auto tuple_idx = _row_descriptor.get_tuple_idx(build_tuple_desc->id());
RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx);
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_filter_src_expr_ctxs, state, _child_x->row_desc()));
return Status::OK();
}

Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) {
return vectorized::VExpr::open(_filter_src_expr_ctxs, state);
}

Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
auto& local_state =
state->get_sink_local_state(id())->cast<NestedLoopJoinBuildSinkLocalState>();
SCOPED_TIMER(local_state._build_timer);
auto rows = block->rows();
auto mem_usage = block->allocated_bytes();

if (rows != 0) {
local_state._build_rows += rows;
local_state._total_mem_usage += mem_usage;
local_state._shared_state->build_blocks.emplace_back(std::move(*block));
if (_match_all_build || _is_right_semi_anti) {
local_state._shared_state->build_side_visited_flags.emplace_back(
vectorized::ColumnUInt8::create(rows, 0));
}
}

if (source_state == SourceState::FINISHED) {
COUNTER_UPDATE(local_state._build_rows_counter, local_state._build_rows);
vectorized::RuntimeFilterBuild<NestedLoopJoinBuildSinkLocalState> rf_ctx(&local_state);
RETURN_IF_ERROR(rf_ctx(state));

// optimize `in bitmap`, see https://github.com/apache/doris/issues/14338
if (_is_output_left_side_only && ((_join_op == TJoinOp::type::LEFT_SEMI_JOIN &&
local_state._shared_state->build_blocks.empty()) ||
(_join_op == TJoinOp::type::LEFT_ANTI_JOIN &&
!local_state._shared_state->build_blocks.empty()))) {
local_state._shared_state->left_side_eos = true;
}
local_state._dependency->set_done();
}

return Status::OK();
}

} // namespace doris::pipeline
Loading