Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/pipeline/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ set(PIPELINE_FILES
exec/union_sink_operator.cpp
exec/union_source_operator.cpp
exec/data_queue.cpp
exec/select_operator.cpp)
exec/select_operator.cpp
exec/empty_source_operator.cpp)

add_library(Pipeline STATIC
${PIPELINE_FILES}
Expand Down
25 changes: 25 additions & 0 deletions be/src/pipeline/exec/empty_source_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "empty_source_operator.h"

namespace doris::pipeline {
OperatorPtr EmptySourceOperatorBuilder::build_operator() {
return std::make_shared<EmptySourceOperator>(this);
}

} // namespace doris::pipeline
62 changes: 62 additions & 0 deletions be/src/pipeline/exec/empty_source_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "operator.h"

namespace doris::vectorized {
class VExchangeNode;
}

namespace doris::pipeline {

class EmptySourceOperatorBuilder final : public OperatorBuilderBase {
public:
EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor)
: OperatorBuilderBase(id, "EmptySourceOperator"), _row_descriptor(row_descriptor) {}

bool is_source() const override { return true; }

OperatorPtr build_operator() override;

const RowDescriptor& row_desc() override { return _row_descriptor; }

private:
RowDescriptor _row_descriptor;
};

class EmptySourceOperator final : public OperatorBase {
public:
EmptySourceOperator(OperatorBuilderBase* builder) : OperatorBase(builder) {}
bool can_read() override { return true; }
bool is_pending_finish() const override { return false; }

Status prepare(RuntimeState*) override { return Status::OK(); }

Status open(RuntimeState*) override { return Status::OK(); }

Status get_block(RuntimeState* /*runtime_state*/, vectorized::Block* /*block*/,
SourceState& result_state) override {
result_state = SourceState::FINISHED;
return Status::OK();
}

Status sink(RuntimeState*, vectorized::Block*, SourceState) override { return Status::OK(); }
};

} // namespace doris::pipeline
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class HashJoinBuildSinkBuilder final : public OperatorBuilder<vectorized::HashJo
class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilder> {
public:
HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
bool can_write() override { return true; };
bool can_write() override { return _node->can_sink_write(); };
};

} // namespace pipeline
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class OperatorBase {
/**
* Release all resources once this operator done its work.
*/
virtual Status close(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state);

Status set_child(OperatorPtr child) {
if (is_source()) {
Expand Down
9 changes: 8 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/empty_source_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
Expand Down Expand Up @@ -451,7 +452,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
case TPlanNodeType::HASH_JOIN_NODE: {
auto* join_node = assert_cast<vectorized::HashJoinNode*>(node);
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
if (join_node->should_build_hash_table()) {
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
} else {
OperatorBuilderPtr builder = std::make_shared<EmptySourceOperatorBuilder>(
next_operator_builder_id(), node->child(1)->row_desc());
new_pipe->add_operator(builder);
}
OperatorBuilderPtr join_sink =
std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), join_node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
Expand Down
36 changes: 34 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi

std::shared_ptr<FragmentExecState> exec_state;
std::shared_ptr<QueryFragmentsCtx> fragments_ctx;
bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine;
if (params.is_simplified_param) {
// Get common components from _fragments_ctx_map
std::lock_guard<std::mutex> lock(_lock);
Expand Down Expand Up @@ -642,6 +644,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
fragments_ctx->set_rsc_info = true;
}

fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(
pipeline_engine_enabled);
fragments_ctx->timeout_second = params.query_options.query_timeout;
_set_scan_concurrency(params, fragments_ctx.get());

Expand Down Expand Up @@ -701,8 +705,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
}

int64_t duration_ns = 0;
if (!params.query_options.__isset.enable_pipeline_engine ||
!params.query_options.enable_pipeline_engine) {
if (!pipeline_engine_enabled) {
{
SCOPED_RAW_TIMER(&duration_ns);
RETURN_IF_ERROR(exec_state->prepare(params));
Expand Down Expand Up @@ -740,6 +743,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
fragments_ctx->set_ready_to_execute_only();
}
_setup_shared_hashtable_for_broadcast_join(params, exec_state->executor()->runtime_state(),
fragments_ctx.get());
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
fragments_ctx->query_id, fragment_instance_id, params.backend_num,
Expand Down Expand Up @@ -1109,4 +1114,31 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
return Status::OK();
}

void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
RuntimeState* state,
QueryFragmentsCtx* fragments_ctx) {
if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
!params.query_options.enable_share_hash_table_for_broadcast_join) {
return;
}

if (!params.__isset.fragment || !params.fragment.__isset.plan ||
params.fragment.plan.nodes.empty()) {
return;
}
for (auto& node : params.fragment.plan.nodes) {
if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
!node.hash_join_node.__isset.is_broadcast_join ||
!node.hash_join_node.is_broadcast_join) {
continue;
}

if (params.build_hash_table_for_broadcast_join) {
fragments_ctx->get_shared_hash_table_controller()->set_builder_and_consumers(
params.params.fragment_instance_id, params.instances_sharing_hash_table,
node.node_id);
}
}
}

} // namespace doris
7 changes: 4 additions & 3 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ class FragmentMgr : public RestMonitorIface {

bool _is_scan_node(const TPlanNodeType::type& type);

void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
RuntimeState* state,
QueryFragmentsCtx* fragments_ctx);

// This is input params
ExecEnv* _exec_env;

std::mutex _lock;

std::condition_variable _cv;

std::mutex _lock_for_shared_hash_table;
std::condition_variable _cv_for_sharing_hashtable;

// Make sure that remove this before no data reference FragmentExecState
std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map;

Expand Down
9 changes: 2 additions & 7 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,9 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) {

RETURN_IF_ERROR(sink(state, &block, eos));
}
RETURN_IF_ERROR(child(1)->close(state));
} else {
RETURN_IF_ERROR(child(1)->close(state));
RETURN_IF_ERROR(sink(state, nullptr, eos));
}
return Status::OK();
Expand Down Expand Up @@ -768,9 +770,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc

if (_should_build_hash_table && eos) {
// For pipeline engine, children should be closed once this pipeline task is finished.
if (!state->enable_pipeline_exec()) {
child(1)->close(state);
}
if (!_build_side_mutable_block.empty()) {
if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) {
return Status::NotSupported(
Expand Down Expand Up @@ -815,10 +814,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
}
} else if (!_should_build_hash_table &&
((state->enable_pipeline_exec() && eos) || !state->enable_pipeline_exec())) {
// TODO: For pipeline engine, we should finish this pipeline task if _should_build_hash_table is false
if (!state->enable_pipeline_exec()) {
child(1)->close(state);
}
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
auto wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime");
Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ class HashJoinNode final : public VJoinNodeBase {

void debug_string(int indentation_level, std::stringstream* out) const override;

bool can_sink_write() const {
if (_should_build_hash_table) {
return true;
}
return _shared_hash_table_context && _shared_hash_table_context->signaled;
}

bool should_build_hash_table() const { return _should_build_hash_table; }

private:
using VExprContexts = std::vector<VExprContext*>;
// probe expr
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/runtime/shared_hash_table_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,28 @@
namespace doris {
namespace vectorized {

void SharedHashTableController::set_builder_and_consumers(TUniqueId builder,
const std::vector<TUniqueId>& consumers,
int node_id) {
// Only need to set builder and consumers with pipeline engine enabled.
DCHECK(_pipeline_engine_enabled);
std::lock_guard<std::mutex> lock(_mutex);
DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend());
_builder_fragment_ids.insert({node_id, builder});
_ref_fragments[node_id].assign(consumers.cbegin(), consumers.cend());
}

bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id,
int my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
auto it = _builder_fragment_ids.find(my_node_id);
if (_pipeline_engine_enabled) {
if (it != _builder_fragment_ids.cend()) {
return it->second == fragment_instance_id;
}
return false;
}

if (it == _builder_fragment_ids.cend()) {
_builder_fragment_ids.insert({my_node_id, fragment_instance_id});
return true;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,21 @@ using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;

class SharedHashTableController {
public:
/// set hash table builder's fragment instance id and consumers' fragment instance id
void set_builder_and_consumers(TUniqueId builder, const std::vector<TUniqueId>& consumers,
int node_id);
TUniqueId get_builder_fragment_instance_id(int my_node_id);
SharedHashTableContextPtr get_context(int my_node_id);
void signal(int my_node_id);
Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context);
bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id);
void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled = enabled; }

private:
bool _pipeline_engine_enabled = false;
std::mutex _mutex;
std::condition_variable _cv;
std::map<int, std::vector<TUniqueId>> _ref_fragments;
std::map<int /*node id*/, TUniqueId /*fragment instance id*/> _builder_fragment_ids;
std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ private PlanFragment createHashJoinFragment(
node.setChild(0, leftChildFragment.getPlanRoot());
connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
leftChildFragment.setPlanRoot(node);
rightChildFragment.setRightChildOfBroadcastHashJoin(true);
return leftChildFragment;
} else {
node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// has colocate plan node
private boolean hasColocatePlanNode = false;

private boolean isRightChildOfBroadcastHashJoin = false;

/**
* C'tor for fragment with specific partition; the output is by default broadcast.
*/
Expand Down Expand Up @@ -425,6 +427,14 @@ public boolean isTransferQueryStatisticsWithEveryBatch() {
return transferQueryStatisticsWithEveryBatch;
}

public boolean isRightChildOfBroadcastHashJoin() {
return isRightChildOfBroadcastHashJoin;
}

public void setRightChildOfBroadcastHashJoin(boolean value) {
isRightChildOfBroadcastHashJoin = value;
}

public int getFragmentSequenceNum() {
if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
return fragmentSequenceNum;
Expand Down
Loading