Skip to content

Commit

Permalink
[Enhancement] Push down topn with left and right outer join (StarRock…
Browse files Browse the repository at this point in the history
…s#30128)

Signed-off-by: stephen <stephen5217@163.com>
  • Loading branch information
stephen-shelby authored and Jay-ju committed Sep 7, 2023
1 parent 9eec04f commit 30e1545
Show file tree
Hide file tree
Showing 5 changed files with 497 additions and 1 deletion.
14 changes: 13 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String CONNECTOR_IO_TASKS_SLOW_IO_LATENCY_MS = "connector_io_tasks_slow_io_latency_ms";
public static final String SCAN_USE_QUERY_MEM_RATIO = "scan_use_query_mem_ratio";
public static final String CONNECTOR_SCAN_USE_QUERY_MEM_RATIO = "connector_scan_use_query_mem_ratio";

public static final String ENABLE_QUERY_CACHE = "enable_query_cache";
public static final String QUERY_CACHE_FORCE_POPULATE = "query_cache_force_populate";
public static final String QUERY_CACHE_ENTRY_MAX_BYTES = "query_cache_entry_max_bytes";
Expand Down Expand Up @@ -497,6 +496,8 @@ public static MaterializedViewRewriteMode parse(String str) {
// binary, json, compact
public static final String THRIFT_PLAN_PROTOCOL = "thrift_plan_protocol";

public static final String CBO_PUSHDOWN_TOPN_LIMIT = "cbo_push_down_topn_limit";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(MAX_EXECUTION_TIME)
Expand Down Expand Up @@ -1047,6 +1048,17 @@ public static MaterializedViewRewriteMode parse(String str) {
@VarAttr(name = THRIFT_PLAN_PROTOCOL)
private String thriftPlanProtocol = "binary";

@VarAttr(name = CBO_PUSHDOWN_TOPN_LIMIT)
private long cboPushDownTopNLimit = 1000;

public long getCboPushDownTopNLimit() {
return cboPushDownTopNLimit;
}

public void setCboPushDownTopNLimit(long cboPushDownTopNLimit) {
this.cboPushDownTopNLimit = cboPushDownTopNLimit;
}

public String getThriftPlanProtocol() {
return thriftPlanProtocol;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.starrocks.sql.optimizer.rule.transformation.MergeProjectWithChildRule;
import com.starrocks.sql.optimizer.rule.transformation.MergeTwoAggRule;
import com.starrocks.sql.optimizer.rule.transformation.MergeTwoProjectRule;
import com.starrocks.sql.optimizer.rule.transformation.OuterJoinAddRedundantTopNRule;
import com.starrocks.sql.optimizer.rule.transformation.PruneEmptyWindowRule;
import com.starrocks.sql.optimizer.rule.transformation.PushDownAggToMetaScanRule;
import com.starrocks.sql.optimizer.rule.transformation.PushDownJoinOnExpressionToChildProject;
Expand Down Expand Up @@ -401,6 +402,7 @@ private OptExpression logicalRuleRewrite(ConnectContext connectContext,
// After this rule, we shouldn't generate logical project operator
ruleRewriteIterative(tree, rootTaskContext, new MergeProjectWithChildRule());

ruleRewriteOnlyOnce(tree, rootTaskContext, new OuterJoinAddRedundantTopNRule());
ruleRewriteOnlyOnce(tree, rootTaskContext, RuleSetType.INTERSECT_REWRITE);
ruleRewriteIterative(tree, rootTaskContext, new RemoveAggregationFromAggTable());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public enum RuleType {
TF_PUSH_DOWN_PREDICATE_SCAN,
TF_PUSH_DOWN_PREDICATE_AGG,
TF_PUSH_DOWN_PREDICATE_WINDOW,
TF_PUSH_DOWN_TOPN_OUTER_JOIN,
TF_PUSH_DOWN_PREDICATE_RANKING_WINDOW,
TF_PUSH_DOWN_PREDICATE_JOIN,
TF_PUSH_DOWN_JOIN_CLAUSE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.sql.optimizer.rule.transformation;

import com.google.common.collect.Lists;
import com.starrocks.analysis.JoinOperator;
import com.starrocks.sql.optimizer.OptExpression;
import com.starrocks.sql.optimizer.OptimizerContext;
import com.starrocks.sql.optimizer.base.Ordering;
import com.starrocks.sql.optimizer.operator.OperatorType;
import com.starrocks.sql.optimizer.operator.logical.LogicalJoinOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalTopNOperator;
import com.starrocks.sql.optimizer.operator.pattern.Pattern;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.rule.RuleType;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class OuterJoinAddRedundantTopNRule extends TransformationRule {
public OuterJoinAddRedundantTopNRule() {
super(RuleType.TF_PUSH_DOWN_TOPN_OUTER_JOIN,
Pattern.create(OperatorType.LOGICAL_TOPN).addChildren(
Pattern.create(OperatorType.LOGICAL_JOIN, OperatorType.PATTERN_LEAF, OperatorType.PATTERN_LEAF)));
}

@Override
public boolean check(OptExpression input, OptimizerContext context) {
LogicalTopNOperator topn = (LogicalTopNOperator) input.getOp();

if (topn.getLimit() > context.getSessionVariable().getCboPushDownTopNLimit()) {
return false;
}

OptExpression childExpr = input.inputAt(0);
LogicalJoinOperator joinOperator = childExpr.getOp().cast();
JoinOperator joinType = joinOperator.getJoinType();

if (!joinType.isLeftOuterJoin() && !joinType.isRightOuterJoin()) {
return false;
}

if (joinOperator.getPredicate() != null) {
return false;
}

OptExpression joinChild = null;
if (joinType.isLeftOuterJoin()) {
joinChild = childExpr.inputAt(0);
} else if (joinType.isRightJoin()) {
joinChild = childExpr.inputAt(1);
}

if (topn.hasLimit() && joinChild.getOp().hasLimit() && topn.getLimit() >= joinChild.getOp().getLimit()) {
return false;
}

List<Integer> colIds = topn.getOrderByElements().stream()
.map(Ordering::getColumnRef)
.map(ColumnRefOperator::getId)
.collect(Collectors.toList());

return joinChild.getOutputColumns().containsAll(colIds);
}

@Override
public List<OptExpression> transform(OptExpression input, OptimizerContext context) {
LogicalTopNOperator topn = input.getOp().cast();
OptExpression childExpr = input.inputAt(0);
LogicalJoinOperator joinOperator = childExpr.getOp().cast();

OptExpression joinChildWithSort;
if (joinOperator.getJoinType().isLeftOuterJoin()) {
joinChildWithSort = childExpr.inputAt(0);
} else {
joinChildWithSort = childExpr.inputAt(1);
}

OptExpression newTopNOperator = OptExpression.create(new LogicalTopNOperator.Builder()
.setOrderByElements(topn.getOrderByElements())
.setLimit(topn.getLimit())
.setTopNType(topn.getTopNType())
.setSortPhase(topn.getSortPhase())
.setIsSplit(false)
.build(), joinChildWithSort);

OptExpression newJoinOperator;
if (joinOperator.getJoinType().isLeftOuterJoin()) {
newJoinOperator = OptExpression.create(joinOperator,
Lists.newArrayList(newTopNOperator, childExpr.inputAt(1)));
} else {
newJoinOperator = OptExpression.create(joinOperator,
Lists.newArrayList(childExpr.inputAt(0), newTopNOperator));
}

return Collections.singletonList(OptExpression.create(topn, newJoinOperator));
}
}

0 comments on commit 30e1545

Please sign in to comment.