Skip to content

Commit

Permalink
[Feature] Runtime Filtering for Doris (Background, Configuration, FE …
Browse files Browse the repository at this point in the history
…Implement, Tuning, Test )
  • Loading branch information
Xinyi Zou committed Jul 5, 2021
1 parent d6e6c78 commit f9786d5
Show file tree
Hide file tree
Showing 31 changed files with 2,151 additions and 98 deletions.
46 changes: 46 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.BetweenToCompoundRule;
import org.apache.doris.rewrite.ExprRewriteRule;
Expand Down Expand Up @@ -148,6 +149,9 @@ public class Analyzer {
private boolean isUDFAllowed = true;
// timezone specified for some operation, such as broker load
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;

// The runtime filter that is expected to be used
private final List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();

public void setIsSubquery() {
isSubquery = true;
Expand All @@ -163,6 +167,10 @@ public void setIsSubquery() {
public void setTimezone(String timezone) { this.timezone = timezone; }
public String getTimezone() { return timezone; }

public void putAssignedRuntimeFilter(RuntimeFilter rf) { assignedRuntimeFilters.add(rf); }
public List<RuntimeFilter> getAssignedRuntimeFilter() { return assignedRuntimeFilters; }
public void clearAssignedRuntimeFilters() { assignedRuntimeFilters.clear(); }

// state shared between all objects of an Analyzer tree
// TODO: Many maps here contain properties about tuples, e.g., whether
// a tuple is outer/semi joined, etc. Remove the maps in favor of making
Expand Down Expand Up @@ -551,6 +559,10 @@ public TupleDescriptor getTupleDesc(TupleId id) {
return globalState.descTbl.getTupleDesc(id);
}

public SlotDescriptor getSlotDesc(SlotId id) {
return globalState.descTbl.getSlotDesc(id);
}

/**
* Given a "table alias"."column alias", return the SlotDescriptor
*
Expand Down Expand Up @@ -1809,4 +1821,38 @@ public void markRefdSlots(Analyzer analyzer, PlanNode planRoot,
}
}
}

/**
* Column conduction, can slot a value-transfer to slot b
*
* TODO(zxy) Use value-transfer graph to check
*/
public boolean hasValueTransfer(SlotId a, SlotId b) {
return a.equals(b);
}

/**
* Returns sorted slot IDs with value transfers from 'srcSid'.
* Time complexity: O(V) where V = number of slots
*
* TODO(zxy) Use value-transfer graph to check
*/
public List<SlotId> getValueTransferTargets(SlotId srcSid) {
List<SlotId> result = new ArrayList<>();
result.add(srcSid);
return result;
}

/**
* Returns true if any of the given slot ids or their value-transfer targets belong
* to an outer-joined tuple.
*/
public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) {
for (SlotId srcSid: sids) {
for (SlotId dstSid: getValueTransferTargets(srcSid)) {
if (isOuterJoined(getTupleId(dstSid))) return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public Operator converse() {

public boolean isEquivalence() { return this == EQ || this == EQ_FOR_NULL; };

public boolean isUnNullSafeEquivalence() { return this == EQ; };

public boolean isUnequivalence() { return this == NE; }
}

Expand Down
37 changes: 37 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -1704,4 +1704,41 @@ public Expr getResultValue() throws AnalysisException {
final Expr newExpr = ExpressionFunctions.INSTANCE.evalExpr(this);
return newExpr != null ? newExpr : this;
}

public String getStringValue() {
if (this instanceof LiteralExpr) {
return ((LiteralExpr) this).getStringValue();
}
return "";
}

public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) {
for (Expr child: expr.getChildren()) {
if (child.isBoundByTupleIds(tids)) return child;
}
return null;
}

/**
* Returns true if expr contains specify function, otherwise false.
*/
public boolean isContainsFunction(String functionName) {
if (fn == null) return false;
if (fn.functionName().equalsIgnoreCase(functionName)) return true;
for (Expr child: children) {
if (child.isContainsFunction(functionName)) return true;
}
return false;
}

/**
* Returns true if expr contains specify className, otherwise false.
*/
public boolean isContainsClass(String className) {
if (this.getClass().getName().equalsIgnoreCase(className)) return true;
for (Expr child: children) {
if (child.isContainsClass(className)) return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public static boolean isEquivalencePredicate(Expr expr) {
&& ((BinaryPredicate) expr).getOp().isEquivalence();
}

public static boolean isUnNullSafeEquivalencePredicate(Expr expr) {
return (expr instanceof BinaryPredicate)
&& ((BinaryPredicate) expr).getOp().isUnNullSafeEquivalence();
}

public static boolean canPushDownPredicate(Expr expr) {
if (!(expr instanceof Predicate)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.io.Text;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.qe.VariableVarConverters;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TExprNodeType;
import org.apache.doris.thrift.TStringLiteral;
Expand All @@ -43,18 +43,8 @@
public class StringLiteral extends LiteralExpr {
private static final Logger LOG = LogManager.getLogger(StringLiteral.class);
private String value;
/**
* the session variable `sql_mode` is a special kind of variable.
* it's real type is int, so when querying `select @@sql_mode`, the return column
* type is "int". but user usually set this variable by string, such as:
* `set @@sql_mode = 'STRICT_TRANS_TABLES'`
* or
* `set @@sql_mode = concat(@@sql_mode, 'STRICT_TRANS_TABLES')'`
* <p>
* So when it need to be cast to int, it means "cast 'STRICT_TRANS_TABLES' to Integer".
* To support this, we set `isSqlMode` to true, so that it can cast sql mode name to integer.
*/
private boolean isSqlMode = false;
// Means the converted session variable need to be cast to int, such as "cast 'STRICT_TRANS_TABLES' to Integer".
private String beConverted = "";

public StringLiteral() {
super();
Expand All @@ -73,8 +63,8 @@ protected StringLiteral(StringLiteral other) {
value = other.value;
}

public void setIsSqlMode(boolean val) {
this.isSqlMode = val;
public void setBeConverted(String val) {
this.beConverted = val;
}

@Override
Expand Down Expand Up @@ -203,20 +193,18 @@ protected Expr uncheckedCastTo(Type targetType) throws AnalysisException {
case SMALLINT:
case INT:
case BIGINT:
if (isSqlMode) {
if (VariableVarConverters.hasConverter(beConverted)) {
try {
long sqlMode = SqlModeHelper.encode(value);
return new IntLiteral(sqlMode, targetType);
return new IntLiteral(VariableVarConverters.encode(beConverted, value), targetType);
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
}
return new IntLiteral(value, targetType);
case LARGEINT:
if (isSqlMode) {
if (VariableVarConverters.hasConverter(beConverted)) {
try {
long sqlMode = SqlModeHelper.encode(value);
return new LargeIntLiteral(String.valueOf(sqlMode));
return new LargeIntLiteral(String.valueOf(VariableVarConverters.encode(beConverted, value)));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.qe.VariableVarConverters;
import org.apache.doris.thrift.TBoolLiteral;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TExprNodeType;
Expand Down Expand Up @@ -72,10 +71,10 @@ public Expr clone() {
@Override
public void analyzeImpl(Analyzer analyzer) throws AnalysisException {
VariableMgr.fillValue(analyzer.getContext().getSessionVariable(), this);
if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) {
if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) {
setType(Type.VARCHAR);
try {
setStringValue(SqlModeHelper.decode(intValue));
setStringValue(VariableVarConverters.decode(name, intValue));
} catch (DdlException e) {
ErrorReport.reportAnalysisException(e.getMessage());
}
Expand Down Expand Up @@ -117,16 +116,13 @@ public Expr getLiteralExpr() {
@Override
public Expr getResultValue() throws AnalysisException {
Expr expr = super.getResultValue();
if (!Strings.isNullOrEmpty(name) && name.equalsIgnoreCase(SessionVariable.SQL_MODE)) {
// SQL_MODE is a special variable. Its type is int, but it is usually set using a string.
// Such as `set sql_mode = concat(@@sql_mode, "STRICT_TRANS_TABLES");`
// So we return the string type here so that it can correctly match the subsequent function signature.
// We will convert the string to int in VariableMgr.
// And we also set `isSqlMode` to true in StringLiteral, so that it can be cast back
if (!Strings.isNullOrEmpty(name) && VariableVarConverters.hasConverter(name)) {
// Return the string type here so that it can correctly match the subsequent function signature.
// And we also set `beConverted` to session variable name in StringLiteral, so that it can be cast back
// to Integer when returning value.
try {
StringLiteral s = new StringLiteral(SqlModeHelper.decode(intValue));
s.setIsSqlMode(true);
StringLiteral s = new StringLiteral(VariableVarConverters.decode(name, intValue));
s.setBeConverted(name);
return s;
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,24 @@ private static boolean requiresNullWrapping(Expr expr, Analyzer analyzer) {
public String toSqlImpl() {
return "TupleIsNull(" + Joiner.on(",").join(tupleIds) + ")";
}

/**
* Recursive function that replaces all 'IF(TupleIsNull(), NULL, e)' exprs in
* 'expr' with e and returns the modified expr.
*/
public static Expr unwrapExpr(Expr expr) {
if (expr instanceof FunctionCallExpr) {
FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr;
List<Expr> params = fnCallExpr.getParams().exprs();
if (fnCallExpr.getFnName().getFunction().equals("if") &&
params.get(0) instanceof TupleIsNullPredicate &&
Expr.IS_NULL_LITERAL.apply(params.get(1))) {
return unwrapExpr(params.get(2));
}
}
for (int i = 0; i < expr.getChildren().size(); ++i) {
expr.setChild(i, unwrapExpr(expr.getChild(i)));
}
return expr;
}
}
39 changes: 39 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/util/BitUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.util;

public class BitUtil {

// Returns the log2 of 'val'. 'val' must be > 0.
public static int log2Ceiling(long val) {
// Formula is based on the Long.numberOfLeadingZeros() javadoc comment.
return 64 - Long.numberOfLeadingZeros(val - 1);
}

// Round up 'val' to the nearest power of two. 'val' must be > 0.
public static long roundUpToPowerOf2(long val) {
return 1L << log2Ceiling(val);
}

// Round up 'val' to the nearest multiple of a power-of-two 'factor'.
// 'val' must be > 0.
public static long roundUpToPowerOf2Factor(long val, long factor) {
return (val + (factor - 1)) & ~(factor - 1);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ && isBroadcastCostSmaller(broadcastCost, partitionCost)
node.setChild(0, leftChildFragment.getPlanRoot());
connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
leftChildFragment.setPlanRoot(node);

return leftChildFragment;
} else {
node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
Expand Down Expand Up @@ -472,8 +471,6 @@ && isBroadcastCostSmaller(broadcastCost, partitionCost)
rightChildFragment.setDestination(rhsExchange);
rightChildFragment.setOutputPartition(rhsJoinPartition);

// TODO: Before we support global runtime filter, only shuffle join do not enable local runtime filter
node.setIsPushDown(false);
return joinFragment;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public class HashJoinNode extends PlanNode {
private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
// join conjuncts from the JOIN clause that aren't equi-join predicates
private List<Expr> otherJoinConjuncts;
private boolean isPushDown = false;
private DistributionMode distrMode;
private boolean isColocate = false; //the flag for colocate join
private String colocateReason = ""; // if can not do colocate join, set reason here
Expand All @@ -85,9 +84,6 @@ public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner, TableRef inne
this.otherJoinConjuncts = otherJoinConjuncts;
children.add(outer);
children.add(inner);
if (this.joinOp.isInnerJoin() || this.joinOp.isLeftSemiJoin()) {
this.isPushDown = true;
}

// Inherits all the nullable tuple from the children
// Mark tuples that form the "nullable" side of the outer join as nullable.
Expand Down Expand Up @@ -282,10 +278,6 @@ public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) {
}
}

public void setIsPushDown(boolean isPushDown) {
this.isPushDown = isPushDown;
}

@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
Expand All @@ -300,7 +292,6 @@ protected void toThrift(TPlanNode msg) {
for (Expr e : otherJoinConjuncts) {
msg.hash_join_node.addToOtherJoinConjuncts(e.treeToThrift());
}
msg.hash_join_node.setIsPushDown(isPushDown);
}

@Override
Expand All @@ -326,6 +317,12 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve
if (!conjuncts.isEmpty()) {
output.append(detailPrefix).append("other predicates: ").append(getExplainString(conjuncts)).append("\n");
}
if (!runtimeFilters.isEmpty()) {
output.append(detailPrefix).append("runtime filters: ");
output.append(getRuntimeFilterExplainString(true));
}
output.append(detailPrefix).append(String.format(
"cardinality=%s", cardinality)).append("\n");
return output.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,10 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
output.append(prefix).append("PREDICATES: ").append(
getExplainString(conjuncts)).append("\n");
}
if (!runtimeFilters.isEmpty()) {
output.append(prefix).append("runtime filters: ");
output.append(getRuntimeFilterExplainString(false));
}

output.append(prefix).append(String.format(
"partitions=%s/%s",
Expand Down

0 comments on commit f9786d5

Please sign in to comment.