From d719fab5f55cfbdf23da0cf059a58b869b43600b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 14 Feb 2026 18:26:33 +0800 Subject: [PATCH 1/2] [fix](local exchange) Fix BUCKET_HASH_SHUFFLE partition expr --- be/src/pipeline/pipeline_fragment_context.cpp | 49 ++++++++++--------- be/src/pipeline/pipeline_fragment_context.h | 11 ++--- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 4dfa58e45a0ac5..ae60d97f178090 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -650,7 +650,7 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip int node_idx = 0; RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr, - &node_idx, root, cur_pipe, 0, false)); + &node_idx, root, cur_pipe, 0, false, false)); if (node_idx + 1 != _params.fragment.plan.nodes.size()) { return Status::InternalError( @@ -659,12 +659,10 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip return Status::OK(); } -Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, - const std::vector& tnodes, - const DescriptorTbl& descs, OperatorPtr parent, - int* node_idx, OperatorPtr* root, - PipelinePtr& cur_pipe, int child_idx, - const bool followed_by_shuffled_operator) { +Status PipelineFragmentContext::_create_tree_helper( + ObjectPool* pool, const std::vector& tnodes, const DescriptorTbl& descs, + OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, + const bool followed_by_shuffled_operator, const bool require_bucket_distribution) { // propagate error case if (*node_idx >= tnodes.size()) { return Status::InternalError( @@ -675,10 +673,12 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, int num_children = tnodes[*node_idx].num_children; bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; + bool current_require_bucket_distribution = require_bucket_distribution; OperatorPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_operator)); + followed_by_shuffled_operator, + current_require_bucket_distribution)); // Initialization must be done here. For example, group by expressions in agg will be used to // decide if a local shuffle should be planed, so it must be initialized here. RETURN_IF_ERROR(op->init(tnode, _runtime_state.get())); @@ -710,6 +710,12 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, : op->is_shuffled_operator())) && Pipeline::is_hash_exchange(required_data_distribution.distribution_type); + current_require_bucket_distribution = + (require_bucket_distribution || + (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator() + : op->is_colocated_operator())) && + Pipeline::is_hash_exchange(required_data_distribution.distribution_type); + if (num_children == 0) { _use_serial_source = op->is_serial_operator(); } @@ -717,7 +723,8 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i, - current_followed_by_shuffled_operator)); + current_followed_by_shuffled_operator, + current_require_bucket_distribution)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -1207,18 +1214,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_operator) { + const bool followed_by_shuffled_operator, + const bool require_bucket_distribution) { std::vector sink_ops; Defer defer = Defer([&]() { if (op) { - op->update_operator(tnode, followed_by_shuffled_operator, _require_bucket_distribution); - _require_bucket_distribution = - _require_bucket_distribution || op->is_colocated_operator(); + op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution); } for (auto& s : sink_ops) { - s->update_operator(tnode, followed_by_shuffled_operator, _require_bucket_distribution); - _require_bucket_distribution = - _require_bucket_distribution || s->is_colocated_operator(); + s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution); } }); // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. @@ -1593,15 +1597,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo break; } case TPlanNodeType::INTERSECT_NODE: { - RETURN_IF_ERROR(_build_operators_for_set_operation_node( - pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, - !tnode.intersect_node.is_colocate || followed_by_shuffled_operator, sink_ops)); + RETURN_IF_ERROR(_build_operators_for_set_operation_node(pool, tnode, descs, op, + cur_pipe, sink_ops)); break; } case TPlanNodeType::EXCEPT_NODE: { - RETURN_IF_ERROR(_build_operators_for_set_operation_node( - pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, - !tnode.except_node.is_colocate || followed_by_shuffled_operator, sink_ops)); + RETURN_IF_ERROR(_build_operators_for_set_operation_node(pool, tnode, descs, op, + cur_pipe, sink_ops)); break; } case TPlanNodeType::REPEAT_NODE: { @@ -1698,8 +1700,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo template Status PipelineFragmentContext::_build_operators_for_set_operation_node( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, - PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool followed_by_shuffled_operator, - std::vector& sink_ops) { + PipelinePtr& cur_pipe, std::vector& sink_ops) { op.reset(new SetSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f908abc6b31753..867268e3d7e484 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -144,17 +144,17 @@ class PipelineFragmentContext : public TaskExecutionContext { Status _create_tree_helper(ObjectPool* pool, const std::vector& tnodes, const DescriptorTbl& descs, OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, - const bool followed_by_shuffled_join); + const bool followed_by_shuffled_join, + const bool require_bucket_distribution); Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join); + const bool followed_by_shuffled_join, + const bool require_bucket_distribution); template Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, - PipelinePtr& cur_pipe, int parent_idx, - int child_idx, - bool followed_by_shuffled_operator, + PipelinePtr& cur_pipe, std::vector& sink_ops); Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, @@ -337,7 +337,6 @@ class PipelineFragmentContext : public TaskExecutionContext { // Total instance num running on all BEs int _total_instances = -1; - bool _require_bucket_distribution = false; TPipelineFragmentParams _params; int32_t _parallel_instances = 0; From d6be3efbdd35d5462325c5f186bd579085845274 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sat, 14 Feb 2026 18:32:06 +0800 Subject: [PATCH 2/2] update --- be/src/pipeline/pipeline_fragment_context.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index ae60d97f178090..24df3581e682c2 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -2099,8 +2099,6 @@ Status PipelineFragmentContext::set_to_rerun() { Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) { _submitted = false; _is_fragment_instance_closed = false; - // _require_bucket_distribution may be set to true to affect the building of pipeline with local shuffle - _require_bucket_distribution = false; return _build_and_prepare_full_pipeline(thread_pool); }