Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
int node_idx = 0;

RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
&node_idx, root, cur_pipe, 0, false));
&node_idx, root, cur_pipe, 0, false, false));

if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
return Status::InternalError(
Expand All @@ -659,12 +659,10 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
return Status::OK();
}

Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const std::vector<TPlanNode>& tnodes,
const DescriptorTbl& descs, OperatorPtr parent,
int* node_idx, OperatorPtr* root,
PipelinePtr& cur_pipe, int child_idx,
const bool followed_by_shuffled_operator) {
Status PipelineFragmentContext::_create_tree_helper(
ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
// propagate error case
if (*node_idx >= tnodes.size()) {
return Status::InternalError(
Expand All @@ -675,10 +673,12 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,

int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
bool current_require_bucket_distribution = require_bucket_distribution;
OperatorPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
parent == nullptr ? -1 : parent->node_id(), child_idx,
followed_by_shuffled_operator));
followed_by_shuffled_operator,
current_require_bucket_distribution));
// Initialization must be done here. For example, group by expressions in agg will be used to
// decide if a local shuffle should be planed, so it must be initialized here.
RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -710,14 +710,21 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
: op->is_shuffled_operator())) &&
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);

current_require_bucket_distribution =
(require_bucket_distribution ||
(cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
: op->is_colocated_operator())) &&
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);

if (num_children == 0) {
_use_serial_source = op->is_serial_operator();
}
// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
current_followed_by_shuffled_operator));
current_followed_by_shuffled_operator,
current_require_bucket_distribution));

// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
Expand Down Expand Up @@ -1207,18 +1214,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
const DescriptorTbl& descs, OperatorPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_operator) {
const bool followed_by_shuffled_operator,
const bool require_bucket_distribution) {
std::vector<DataSinkOperatorPtr> sink_ops;
Defer defer = Defer([&]() {
if (op) {
op->update_operator(tnode, followed_by_shuffled_operator, _require_bucket_distribution);
_require_bucket_distribution =
_require_bucket_distribution || op->is_colocated_operator();
op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
}
for (auto& s : sink_ops) {
s->update_operator(tnode, followed_by_shuffled_operator, _require_bucket_distribution);
_require_bucket_distribution =
_require_bucket_distribution || s->is_colocated_operator();
s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
}
});
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
Expand Down Expand Up @@ -1593,15 +1597,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
break;
}
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
!tnode.intersect_node.is_colocate || followed_by_shuffled_operator, sink_ops));
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
cur_pipe, sink_ops));
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
!tnode.except_node.is_colocate || followed_by_shuffled_operator, sink_ops));
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
cur_pipe, sink_ops));
break;
}
case TPlanNodeType::REPEAT_NODE: {
Expand Down Expand Up @@ -1698,8 +1700,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool followed_by_shuffled_operator,
std::vector<DataSinkOperatorPtr>& sink_ops) {
PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));

Expand Down Expand Up @@ -2098,8 +2099,6 @@ Status PipelineFragmentContext::set_to_rerun() {
Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
_submitted = false;
_is_fragment_instance_closed = false;
// _require_bucket_distribution may be set to true to affect the building of pipeline with local shuffle
_require_bucket_distribution = false;
return _build_and_prepare_full_pipeline(thread_pool);
}

Expand Down
11 changes: 5 additions & 6 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,17 @@ class PipelineFragmentContext : public TaskExecutionContext {
Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
const DescriptorTbl& descs, OperatorPtr parent, int* node_idx,
OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
const bool followed_by_shuffled_join);
const bool followed_by_shuffled_join,
const bool require_bucket_distribution);

Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx,
const bool followed_by_shuffled_join);
const bool followed_by_shuffled_join,
const bool require_bucket_distribution);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, OperatorPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
bool followed_by_shuffled_operator,
PipelinePtr& cur_pipe,
std::vector<DataSinkOperatorPtr>& sink_ops);

Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
Expand Down Expand Up @@ -337,7 +337,6 @@ class PipelineFragmentContext : public TaskExecutionContext {

// Total instance num running on all BEs
int _total_instances = -1;
bool _require_bucket_distribution = false;

TPipelineFragmentParams _params;
int32_t _parallel_instances = 0;
Expand Down
Loading