From c6cf13b164bd3ae08ad2293b1272f796ef8d814a Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Wed, 4 Jan 2023 22:00:18 +0800 Subject: [PATCH] [improvement](pipeline) Support sharing hash table for broadcast join --- be/src/pipeline/CMakeLists.txt | 3 +- .../pipeline/exec/empty_source_operator.cpp | 25 ++++++++ be/src/pipeline/exec/empty_source_operator.h | 62 +++++++++++++++++++ be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- be/src/pipeline/exec/operator.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 9 ++- be/src/runtime/fragment_mgr.cpp | 36 ++++++++++- be/src/runtime/fragment_mgr.h | 7 ++- be/src/vec/exec/join/vhash_join_node.cpp | 9 +-- be/src/vec/exec/join/vhash_join_node.h | 9 +++ .../runtime/shared_hash_table_controller.cpp | 18 ++++++ .../runtime/shared_hash_table_controller.h | 6 ++ .../doris/planner/DistributedPlanner.java | 1 + .../apache/doris/planner/PlanFragment.java | 10 +++ .../java/org/apache/doris/qe/Coordinator.java | 49 ++++++++++++--- .../org/apache/doris/qe/SessionVariable.java | 5 +- gensrc/thrift/PaloInternalService.thrift | 4 ++ 17 files changed, 230 insertions(+), 27 deletions(-) create mode 100644 be/src/pipeline/exec/empty_source_operator.cpp create mode 100644 be/src/pipeline/exec/empty_source_operator.h diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index c7c5bf87afadcf..31ebe803066500 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -58,7 +58,8 @@ set(PIPELINE_FILES exec/union_sink_operator.cpp exec/union_source_operator.cpp exec/data_queue.cpp - exec/select_operator.cpp) + exec/select_operator.cpp + exec/empty_source_operator.cpp) add_library(Pipeline STATIC ${PIPELINE_FILES} diff --git a/be/src/pipeline/exec/empty_source_operator.cpp b/be/src/pipeline/exec/empty_source_operator.cpp new file mode 100644 index 00000000000000..b5b140305b4b7e --- /dev/null +++ b/be/src/pipeline/exec/empty_source_operator.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "empty_source_operator.h" + +namespace doris::pipeline { +OperatorPtr EmptySourceOperatorBuilder::build_operator() { + return std::make_shared(this); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/empty_source_operator.h b/be/src/pipeline/exec/empty_source_operator.h new file mode 100644 index 00000000000000..0cfa3bcbf1f068 --- /dev/null +++ b/be/src/pipeline/exec/empty_source_operator.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" + +namespace doris::vectorized { +class VExchangeNode; +} + +namespace doris::pipeline { + +class EmptySourceOperatorBuilder final : public OperatorBuilderBase { +public: + EmptySourceOperatorBuilder(int32_t id, const RowDescriptor& row_descriptor) + : OperatorBuilderBase(id, "EmptySourceOperator"), _row_descriptor(row_descriptor) {} + + bool is_source() const override { return true; } + + OperatorPtr build_operator() override; + + const RowDescriptor& row_desc() override { return _row_descriptor; } + +private: + RowDescriptor _row_descriptor; +}; + +class EmptySourceOperator final : public OperatorBase { +public: + EmptySourceOperator(OperatorBuilderBase* builder) : OperatorBase(builder) {} + bool can_read() override { return true; } + bool is_pending_finish() const override { return false; } + + Status prepare(RuntimeState*) override { return Status::OK(); } + + Status open(RuntimeState*) override { return Status::OK(); } + + Status get_block(RuntimeState* /*runtime_state*/, vectorized::Block* /*block*/, + SourceState& result_state) override { + result_state = SourceState::FINISHED; + return Status::OK(); + } + + Status sink(RuntimeState*, vectorized::Block*, SourceState) override { return Status::OK(); } +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 404b8e53ad4dcf..d41f9df1034741 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -39,7 +39,7 @@ class HashJoinBuildSinkBuilder final : public OperatorBuilder { public: HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node); - bool can_write() override { return true; }; + bool can_write() override { return _node->can_sink_write(); }; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 14afcedfbb9f26..22276d21a145f9 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -168,7 +168,7 @@ class OperatorBase { /** * Release all resources once this operator done its work. */ - virtual Status close(RuntimeState* state) = 0; + virtual Status close(RuntimeState* state); Status set_child(OperatorPtr child) { if (is_source()) { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 1430c8d6f68383..e14839af355130 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -34,6 +34,7 @@ #include "pipeline/exec/data_queue.h" #include "pipeline/exec/datagen_operator.h" #include "pipeline/exec/empty_set_operator.h" +#include "pipeline/exec/empty_source_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" @@ -451,7 +452,13 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur case TPlanNodeType::HASH_JOIN_NODE: { auto* join_node = assert_cast(node); auto new_pipe = add_pipeline(); - RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); + if (join_node->should_build_hash_table()) { + RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe)); + } else { + OperatorBuilderPtr builder = std::make_shared( + next_operator_builder_id(), node->child(1)->row_desc()); + new_pipe->add_operator(builder); + } OperatorBuilderPtr join_sink = std::make_shared(next_operator_builder_id(), join_node); RETURN_IF_ERROR(new_pipe->set_sink(join_sink)); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c89cfc69ef5285..a790dfb1557553 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -611,6 +611,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi std::shared_ptr exec_state; std::shared_ptr fragments_ctx; + bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine && + params.query_options.enable_pipeline_engine; if (params.is_simplified_param) { // Get common components from _fragments_ctx_map std::lock_guard lock(_lock); @@ -642,6 +644,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx->set_rsc_info = true; } + fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled( + pipeline_engine_enabled); fragments_ctx->timeout_second = params.query_options.query_timeout; _set_scan_concurrency(params, fragments_ctx.get()); @@ -701,8 +705,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } int64_t duration_ns = 0; - if (!params.query_options.__isset.enable_pipeline_engine || - !params.query_options.enable_pipeline_engine) { + if (!pipeline_engine_enabled) { { SCOPED_RAW_TIMER(&duration_ns); RETURN_IF_ERROR(exec_state->prepare(params)); @@ -740,6 +743,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { fragments_ctx->set_ready_to_execute_only(); } + _setup_shared_hashtable_for_broadcast_join(params, exec_state->executor()->runtime_state(), + fragments_ctx.get()); std::shared_ptr context = std::make_shared( fragments_ctx->query_id, fragment_instance_id, params.backend_num, @@ -1109,4 +1114,31 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, return Status::OK(); } +void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, + RuntimeState* state, + QueryFragmentsCtx* fragments_ctx) { + if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join || + !params.query_options.enable_share_hash_table_for_broadcast_join) { + return; + } + + if (!params.__isset.fragment || !params.fragment.__isset.plan || + params.fragment.plan.nodes.empty()) { + return; + } + for (auto& node : params.fragment.plan.nodes) { + if (node.node_type != TPlanNodeType::HASH_JOIN_NODE || + !node.hash_join_node.__isset.is_broadcast_join || + !node.hash_join_node.is_broadcast_join) { + continue; + } + + if (params.build_hash_table_for_broadcast_join) { + fragments_ctx->get_shared_hash_table_controller()->set_builder_and_consumers( + params.params.fragment_instance_id, params.instances_sharing_hash_table, + node.node_id); + } + } +} + } // namespace doris diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 050db423dd0b19..837451ec08c615 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -119,6 +119,10 @@ class FragmentMgr : public RestMonitorIface { bool _is_scan_node(const TPlanNodeType::type& type); + void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, + RuntimeState* state, + QueryFragmentsCtx* fragments_ctx); + // This is input params ExecEnv* _exec_env; @@ -126,9 +130,6 @@ class FragmentMgr : public RestMonitorIface { std::condition_variable _cv; - std::mutex _lock_for_shared_hash_table; - std::condition_variable _cv_for_sharing_hashtable; - // Make sure that remove this before no data reference FragmentExecState std::unordered_map> _fragment_map; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index a71a6c931c8baa..ae8ff6e3021bbe 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -718,7 +718,9 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { RETURN_IF_ERROR(sink(state, &block, eos)); } + RETURN_IF_ERROR(child(1)->close(state)); } else { + RETURN_IF_ERROR(child(1)->close(state)); RETURN_IF_ERROR(sink(state, nullptr, eos)); } return Status::OK(); @@ -768,9 +770,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc if (_should_build_hash_table && eos) { // For pipeline engine, children should be closed once this pipeline task is finished. - if (!state->enable_pipeline_exec()) { - child(1)->close(state); - } if (!_build_side_mutable_block.empty()) { if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) { return Status::NotSupported( @@ -815,10 +814,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc } } else if (!_should_build_hash_table && ((state->enable_pipeline_exec() && eos) || !state->enable_pipeline_exec())) { - // TODO: For pipeline engine, we should finish this pipeline task if _should_build_hash_table is false - if (!state->enable_pipeline_exec()) { - child(1)->close(state); - } DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); auto wait_timer = ADD_TIMER(_build_phase_profile, "WaitForSharedHashTableTime"); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 9e1f61050ae123..b85c92f6fc8c12 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -214,6 +214,15 @@ class HashJoinNode final : public VJoinNodeBase { void debug_string(int indentation_level, std::stringstream* out) const override; + bool can_sink_write() const { + if (_should_build_hash_table) { + return true; + } + return _shared_hash_table_context && _shared_hash_table_context->signaled; + } + + bool should_build_hash_table() const { return _should_build_hash_table; } + private: using VExprContexts = std::vector; // probe expr diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index e9e125a168cafe..f18059fc7dd852 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -22,10 +22,28 @@ namespace doris { namespace vectorized { +void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, + const std::vector& consumers, + int node_id) { + // Only need to set builder and consumers with pipeline engine enabled. + DCHECK(_pipeline_engine_enabled); + std::lock_guard lock(_mutex); + DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend()); + _builder_fragment_ids.insert({node_id, builder}); + _ref_fragments[node_id].assign(consumers.cbegin(), consumers.cend()); +} + bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id) { std::lock_guard lock(_mutex); auto it = _builder_fragment_ids.find(my_node_id); + if (_pipeline_engine_enabled) { + if (it != _builder_fragment_ids.cend()) { + return it->second == fragment_instance_id; + } + return false; + } + if (it == _builder_fragment_ids.cend()) { _builder_fragment_ids.insert({my_node_id, fragment_instance_id}); return true; diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index e2c54f533db9a6..48bf803c406d6c 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -64,15 +64,21 @@ using SharedHashTableContextPtr = std::shared_ptr; class SharedHashTableController { public: + /// set hash table builder's fragment instance id and consumers' fragment instance id + void set_builder_and_consumers(TUniqueId builder, const std::vector& consumers, + int node_id); TUniqueId get_builder_fragment_instance_id(int my_node_id); SharedHashTableContextPtr get_context(int my_node_id); void signal(int my_node_id); Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context); bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id); + void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled = enabled; } private: + bool _pipeline_engine_enabled = false; std::mutex _mutex; std::condition_variable _cv; + std::map> _ref_fragments; std::map _builder_fragment_ids; std::map _shared_contexts; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index c537434cc04ee4..89c491165ad78b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -385,6 +385,7 @@ private PlanFragment createHashJoinFragment( node.setChild(0, leftChildFragment.getPlanRoot()); connectChildFragment(node, 1, leftChildFragment, rightChildFragment); leftChildFragment.setPlanRoot(node); + rightChildFragment.setRightChildOfBroadcastHashJoin(true); return leftChildFragment; } else { node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index cae81933112b77..a30eca1171d119 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -145,6 +145,8 @@ public class PlanFragment extends TreeNode { // has colocate plan node private boolean hasColocatePlanNode = false; + private boolean isRightChildOfBroadcastHashJoin = false; + /** * C'tor for fragment with specific partition; the output is by default broadcast. */ @@ -425,6 +427,14 @@ public boolean isTransferQueryStatisticsWithEveryBatch() { return transferQueryStatisticsWithEveryBatch; } + public boolean isRightChildOfBroadcastHashJoin() { + return isRightChildOfBroadcastHashJoin; + } + + public void setRightChildOfBroadcastHashJoin(boolean value) { + isRightChildOfBroadcastHashJoin = value; + } + public int getFragmentSequenceNum() { if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { return fragmentSequenceNum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7ef8ebedb71578..1a36611dbec7d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -230,6 +230,10 @@ public class Coordinator { // eg, System.currentTimeMillis() + query_timeout * 1000 private long timeoutDeadline; + private boolean enableShareHashTableForBroadcastJoin = false; + + private boolean enablePipelineEngine = false; + // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; public TUniqueId runtimeFilterMergeInstanceId; @@ -250,6 +254,8 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.scanNodes = planner.getScanNodes(); this.descTable = planner.getDescTable().toThrift(); this.returnedAllResults = false; + this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin; + this.enablePipelineEngine = context.getSessionVariable().enablePipelineEngine; initQueryOptions(context); setFromUserProperty(analyzer); @@ -1070,13 +1076,37 @@ private void computeFragmentExecParams() throws Exception { params.destinations.add(dest); } } else { - // add destination host to this fragment's destination - for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; - dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); - dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host)); - params.destinations.add(dest); + if (enablePipelineEngine && enableShareHashTableForBroadcastJoin + && params.fragment.isRightChildOfBroadcastHashJoin()) { + // here choose the first instance to build hash table. + Map destHosts = new HashMap<>(); + destParams.instanceExecParams.forEach(param -> { + if (destHosts.containsKey(param.host)) { + destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId); + return; + } + destHosts.put(param.host, param); + + param.buildHashTableForBroadcastJoin = true; + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + dest.fragment_instance_id = param.instanceId; + try { + dest.server = toRpcHost(param.host); + dest.setBrpcServer(toBrpcHost(param.host)); + } catch (Exception e) { + throw new RuntimeException(e); + } + params.destinations.add(dest); + }); + } else { + // add destination host to this fragment's destination + for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; + dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); + dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host)); + params.destinations.add(dest); + } } } } @@ -2326,6 +2356,7 @@ List toThrift(int backendNum) { params.setDescTbl(descTable); params.setParams(new TPlanFragmentExecParams()); params.setResourceInfo(tResourceInfo); + params.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin); params.params.setQueryId(queryId); params.params.setFragmentInstanceId(instanceExecParam.instanceId); Map> scanRanges = instanceExecParam.perNodeScanRanges; @@ -2455,6 +2486,10 @@ static class FInstanceExecParam { FragmentExecParams fragmentExecParams; + boolean buildHashTableForBroadcastJoin = false; + + List instancesSharingHashTable = Lists.newArrayList(); + public void addBucketSeq(int bucketSeq) { this.bucketSeqSet.add(bucketSeq); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index e87e24720767c3..75352d52979a6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1341,10 +1341,7 @@ public TQueryOptions toThrift() { tResult.setEnablePipelineEngine(enablePipelineEngine); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); - - // TODO: enable share hashtable for broadcast after switching completely to pipeline engine. - tResult.setEnableShareHashTableForBroadcastJoin( - enablePipelineEngine ? false : enableShareHashTableForBroadcastJoin); + tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin); tResult.setBatchSize(batchSize); tResult.setDisableStreamPreaggregations(disableStreamPreaggregations); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index bb241c4aa010b4..c03ef169105553 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -376,6 +376,10 @@ struct TExecPlanFragmentParams { // it will wait for the FE to send the "start execution" command before it is actually executed. // Otherwise, the fragment will start executing directly on the BE side. 20: optional bool need_wait_execution_trigger = false; + + 21: optional bool build_hash_table_for_broadcast_join = false; + + 22: optional list instances_sharing_hash_table; } struct TExecPlanFragmentParamsList {