Skip to content

Commit

Permalink
DRILL-1957: Support nested loop join planning in order to enable NOT-…
Browse files Browse the repository at this point in the history
…IN, Inequality, Cartesian, uncorrelated EXISTS planning.

Add support for nested loop join planning where right input is scalar and is broadcast.

Add check for scalar subquery for NLJ. Add support for creating a Filter-NLJ plan.

Rebase on the branch with Jinfeng's Calcite rebasing work.

Conflicts:
	exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java

Add unit tests for NLJoin.

Added test for inequality join.

Tests with BroadcastExchange, with HJ/MJ disabled.

Fix filter push down for NL joins by modifying row count computation for joins with always true conditions.  Rebase on master.  Refactor unit tests.

Improved checking of preconditions for NL join.

Handle the case where scalar aggregate is a child of Filter.

DRILL-1957: Support nested loop join planning in order to enable NOT-IN, Inequality, EXISTS planning.

Better checks for cartesian and inequality joins. Rebase on latest master.

Refactor costing for logical join.  Add tests.  Enable more TPC-H tests.

Remove the check for cartesian join from DrillJoinRel constructor.

Clear left and right keys before calling splitJoinCondition.

Address review comments: Remove redundant call to getJoinCategory.  Added comment in DrillRuleSet.
  • Loading branch information
Aman Sinha committed Apr 29, 2015
1 parent 50c5197 commit c3b79ac
Show file tree
Hide file tree
Showing 20 changed files with 657 additions and 72 deletions.
Expand Up @@ -24,6 +24,9 @@
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillFilterRel;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
Expand All @@ -38,6 +41,7 @@

import java.util.LinkedList;
import java.util.List;
import com.google.common.collect.Lists;

public class JoinUtils {
public static enum JoinComparator {
Expand All @@ -46,6 +50,12 @@ public static enum JoinComparator {
IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator
}

public static enum JoinCategory {
EQUALITY, // equality join
INEQUALITY, // inequality join: <>, <, >
CARTESIAN // no join condition
}

// Check the comparator for the join condition. Note that a similar check is also
// done in JoinPrel; however we have to repeat it here because a physical plan
// may be submitted directly to Drill.
Expand Down Expand Up @@ -194,4 +204,44 @@ public static void addLeastRestrictiveCasts(LogicalExpression[] leftExpressions,
}
}
}

public static boolean isScalarSubquery(RelNode childrel) {
DrillAggregateRel agg = null;
RelNode currentrel = childrel;
while (agg == null && currentrel != null) {
if (currentrel instanceof DrillAggregateRel) {
agg = (DrillAggregateRel)currentrel;
} else if (currentrel instanceof DrillFilterRel) {
currentrel = currentrel.getInput(0);
} else if (currentrel instanceof RelSubset) {
currentrel = ((RelSubset)currentrel).getBest() ;
} else {
break;
}
}

if (agg != null) {
if (agg.getGroupSet().isEmpty()) {
return true;
}
}
return false;
}

public static JoinCategory getJoinCategory(RelNode left, RelNode right, RexNode condition,
List<Integer> leftKeys, List<Integer> rightKeys) {
if (condition.isAlwaysTrue()) {
return JoinCategory.CARTESIAN;
}
leftKeys.clear();
rightKeys.clear();
RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);

if (!remaining.isAlwaysTrue() || (leftKeys.size() == 0 || rightKeys.size() == 0) ) {
// for practical purposes these cases could be treated as inequality
return JoinCategory.INEQUALITY;
}
return JoinCategory.EQUALITY;
}

}
Expand Up @@ -21,11 +21,12 @@
import java.util.HashSet;
import java.util.List;

import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.holders.IntHolder;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.calcite.rel.InvalidRelException;
Expand Down Expand Up @@ -57,22 +58,32 @@ public DrillJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left,

@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
List<Integer> tmpLeftKeys = Lists.newArrayList();
List<Integer> tmpRightKeys = Lists.newArrayList();
RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, tmpLeftKeys, tmpRightKeys);
if (!remaining.isAlwaysTrue() || (tmpLeftKeys.size() == 0 || tmpRightKeys.size() == 0)) {
JoinCategory category = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys);
if (category == JoinCategory.CARTESIAN || category == JoinCategory.INEQUALITY) {
if (PrelUtil.getPlannerSettings(planner).isNestedLoopJoinEnabled()) {
if (PrelUtil.getPlannerSettings(planner).isNlJoinForScalarOnly()) {
if (hasScalarSubqueryInput()) {
return computeLogicalJoinCost(planner);
} else {
return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
}
} else {
return computeLogicalJoinCost(planner);
}
}
return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
}

// We do not know which join method, i.e HASH-join or MergeJoin, will be used in Logical Planning.
// Here, we assume to use Hash-join, since this is a more commonly-used Join method in Drill.
return computeHashJoinCost(planner);
// return super.computeSelfCost(planner);
return computeLogicalJoinCost(planner);
}

@Override
public double getRows() {
return joinRowFactor * Math.max(this.getLeft().getRows(), this.getRight().getRows());
if (this.condition.isAlwaysTrue()) {
return joinRowFactor * this.getLeft().getRows() * this.getRight().getRows();
} else {
return joinRowFactor * Math.max(this.getLeft().getRows(), this.getRight().getRows());
}
}

/**
Expand All @@ -98,6 +109,17 @@ public List<Integer> getRightKeys() {
return this.rightKeys;
}

protected RelOptCost computeLogicalJoinCost(RelOptPlanner planner) {
// During Logical Planning, although we don't care much about the actual physical join that will
// be chosen, we do care about which table - bigger or smaller - is chosen as the right input
// of the join since that is important at least for hash join and we don't currently have
// hybrid-hash-join that can swap the inputs dynamically. The Calcite planner's default cost of a join
// is the same whether the bigger table is used as left input or right. In order to overcome that,
// we will use the Hash Join cost as the logical cost such that cardinality of left and right inputs
// is considered appropriately.
return computeHashJoinCost(planner);
}

protected RelOptCost computeHashJoinCost(RelOptPlanner planner) {
double probeRowCount = RelMetadataQuery.getRowCount(this.getLeft());
double buildRowCount = RelMetadataQuery.getRowCount(this.getRight());
Expand Down Expand Up @@ -131,4 +153,13 @@ protected RelOptCost computeHashJoinCost(RelOptPlanner planner) {
return costFactory.makeCost(buildRowCount + probeRowCount, cpuCost, 0, 0, memCost);

}
private boolean hasScalarSubqueryInput() {
if (JoinUtils.isScalarSubquery(this.getLeft())
|| JoinUtils.isScalarSubquery(this.getRight())) {
return true;
}

return false;
}

}
Expand Up @@ -31,7 +31,7 @@


public class DrillFilterRel extends DrillFilterRelBase implements DrillRel {
protected DrillFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
public DrillFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
super(DRILL_LOGICAL, cluster, traits, child, condition);
}

Expand Down
Expand Up @@ -51,26 +51,14 @@ public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, Rel
JoinRelType joinType) throws InvalidRelException {
super(cluster, traits, left, right, condition, joinType);

RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
if (!remaining.isAlwaysTrue() && (leftKeys.size() == 0 || rightKeys.size() == 0)) {
// throw new InvalidRelException("DrillJoinRel only supports equi-join");
}
RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
}

public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
JoinRelType joinType, List<Integer> leftKeys, List<Integer> rightKeys, boolean checkCartesian) throws InvalidRelException {
JoinRelType joinType, List<Integer> leftKeys, List<Integer> rightKeys) throws InvalidRelException {
super(cluster, traits, left, right, condition, joinType);

assert (leftKeys != null && rightKeys != null);

if (checkCartesian) {
List<Integer> tmpLeftKeys = Lists.newArrayList();
List<Integer> tmpRightKeys = Lists.newArrayList();
RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, tmpLeftKeys, tmpRightKeys);
if (!remaining.isAlwaysTrue() && (tmpLeftKeys.size() == 0 || tmpRightKeys.size() == 0)) {
// throw new InvalidRelException("DrillJoinRel only supports equi-join");
}
}
this.leftKeys = leftKeys;
this.rightKeys = rightKeys;
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ public void onMatch(RelOptRuleCall call) {
newJoinCondition = RexUtil.composeConjunction(builder, equijoinList, false);
} else {
// tracer.warning("Non-equijoins are only supported in the presence of an equijoin.");
return;
// return;
}
}
//else {
Expand All @@ -108,11 +108,11 @@ public void onMatch(RelOptRuleCall call) {
try {
if (!addFilter) {
RelNode joinRel = new DrillJoinRel(join.getCluster(), traits, convertedLeft, convertedRight, origJoinCondition,
join.getJoinType(), leftKeys, rightKeys, false);
join.getJoinType(), leftKeys, rightKeys);
call.transformTo(joinRel);
} else {
RelNode joinRel = new DrillJoinRel(join.getCluster(), traits, convertedLeft, convertedRight, newJoinCondition,
join.getJoinType(), leftKeys, rightKeys, false);
join.getJoinType(), leftKeys, rightKeys);
call.transformTo(new DrillFilterRel(join.getCluster(), traits, joinRel, remaining));
}
} catch (InvalidRelException e) {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.drill.exec.planner.physical.HashJoinPrule;
import org.apache.drill.exec.planner.physical.LimitPrule;
import org.apache.drill.exec.planner.physical.MergeJoinPrule;
import org.apache.drill.exec.planner.physical.NestedLoopJoinPrule;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.ProjectPrule;
import org.apache.drill.exec.planner.physical.PushLimitToTopN;
Expand Down Expand Up @@ -235,6 +236,12 @@ public static final RuleSet getPhysicalRules(QueryContext qcontext) {

}

// NLJ plans consist of broadcasting the right child, hence we need
// broadcast join enabled.
if (ps.isNestedLoopJoinEnabled() && ps.isBroadcastJoinEnabled()) {
ruleList.add(NestedLoopJoinPrule.INSTANCE);
}

return new DrillRuleSet(ImmutableSet.copyOf(ruleList));
}

Expand Down
Expand Up @@ -24,14 +24,16 @@
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;

Expand All @@ -50,7 +52,7 @@ public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, Rel
JoinRelType joinType, boolean swapped) throws InvalidRelException {
super(cluster, traits, left, right, condition, joinType);
this.swapped = swapped;
RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys);
}

@Override
Expand All @@ -67,6 +69,9 @@ public RelOptCost computeSelfCost(RelOptPlanner planner) {
if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
return super.computeSelfCost(planner).multiplyBy(.1);
}
if (joincategory == JoinCategory.CARTESIAN || joincategory == JoinCategory.INEQUALITY) {
return ((DrillCostFactory)planner.getCostFactory()).makeInfiniteCost();
}
return computeHashJoinCost(planner);
}

Expand Down
Expand Up @@ -48,15 +48,16 @@ public boolean matches(RelOptRuleCall call) {

@Override
public void onMatch(RelOptRuleCall call) {
if (!PrelUtil.getPlannerSettings(call.getPlanner()).isHashJoinEnabled()) {
PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
if (!settings.isHashJoinEnabled()) {
return;
}

final DrillJoinRel join = (DrillJoinRel) call.rel(0);
final RelNode left = join.getLeft();
final RelNode right = join.getRight();

if (!checkPreconditions(join, left, right)) {
if (!checkPreconditions(join, left, right, settings)) {
return;
}

Expand All @@ -65,10 +66,12 @@ public void onMatch(RelOptRuleCall call) {
try {

if(isDist){
createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */, hashSingleKey);
createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN,
left, right, null /* left collation */, null /* right collation */, hashSingleKey);
}else{
if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
createBroadcastPlan(call, join, PhysicalJoinType.HASH_JOIN, left, right, null /* left collation */, null /* right collation */);
createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.HASH_JOIN,
left, right, null /* left collation */, null /* right collation */);
}
}

Expand Down
Expand Up @@ -23,6 +23,7 @@

import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.planner.common.DrillJoinRelBase;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.calcite.rel.InvalidRelException;
Expand All @@ -47,6 +48,8 @@
*/
public abstract class JoinPrel extends DrillJoinRelBase implements Prel{

protected JoinUtils.JoinCategory joincategory;

public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
JoinRelType joinType) throws InvalidRelException{
super(cluster, traits, left, right, condition, joinType);
Expand Down

0 comments on commit c3b79ac

Please sign in to comment.