Skip to content
Permalink
Browse files
DRILL-6545: Projection Push down into Lateral Join operator.
closes #1347
  • Loading branch information
HanumathRao authored and vvysotskyi committed Jul 1, 2018
1 parent 7c22e35 commit 8ec2dc64175648103a5ec51f8ad98387496692a9
Show file tree
Hide file tree
Showing 15 changed files with 414 additions and 38 deletions.
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.AbstractJoinPop;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -34,26 +35,31 @@
public class LateralJoinPOP extends AbstractJoinPop {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);

@JsonProperty("excludedColumns")
private List<SchemaPath> excludedColumns;

@JsonProperty("unnestForLateralJoin")
private UnnestPOP unnestForLateralJoin;

@JsonCreator
public LateralJoinPOP(
@JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("joinType") JoinRelType joinType) {
@JsonProperty("joinType") JoinRelType joinType,
@JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
super(left, right, joinType, null, null);
Preconditions.checkArgument(joinType != JoinRelType.FULL,
"Full outer join is currently not supported with Lateral Join");
Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
"Right join is currently not supported with Lateral Join");
this.excludedColumns = excludedColumns;
}

@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.size() == 2,
"Lateral join should have two physical operators");
LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType);
LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns);
newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
return newPOP;
}
@@ -63,6 +69,11 @@ public UnnestPOP getUnnestForLateralJoin() {
return this.unnestForLateralJoin;
}

@JsonProperty("excludedColumns")
public List<SchemaPath> getExcludedColumns() {
return this.excludedColumns;
}

public void setUnnestForLateralJoin(UnnestPOP unnest) {
this.unnestForLateralJoin = unnest;
}
@@ -37,6 +37,8 @@
import org.apache.drill.exec.planner.logical.DrillJoinRule;
import org.apache.drill.exec.planner.logical.DrillLimitRule;
import org.apache.drill.exec.planner.logical.DrillMergeProjectRule;
import org.apache.drill.exec.planner.logical.DrillProjectLateralJoinTransposeRule;
import org.apache.drill.exec.planner.logical.DrillProjectPushIntoLateralJoinRule;
import org.apache.drill.exec.planner.logical.DrillProjectRule;
import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule;
import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule;
@@ -287,7 +289,8 @@ static RuleSet getDrillUserConfigurableLogicalRules(OptimizerRulesContext optimi
// Due to infinite loop in planning (DRILL-3257/CALCITE-1271), temporarily use this rule in Hep planner
// RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE,
DrillFilterAggregateTransposeRule.INSTANCE,

DrillProjectLateralJoinTransposeRule.INSTANCE,
DrillProjectPushIntoLateralJoinRule.INSTANCE,
RuleInstance.FILTER_MERGE_RULE,
RuleInstance.FILTER_CORRELATE_RULE,
RuleInstance.AGGREGATE_REMOVE_RULE,
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.planner.common;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
@@ -25,17 +27,27 @@
import org.apache.calcite.rel.core.Correlate;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SemiJoinType;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.physical.PrelUtil;

import java.util.ArrayList;
import java.util.List;


public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode {
public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {

final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST;
final public boolean excludeCorrelateColumn;
public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
this.excludeCorrelateColumn = excludeCorrelateCol;
}

@Override public RelOptCost computeSelfCost(RelOptPlanner planner,
@@ -49,7 +61,53 @@ public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNod
double rowSize = (this.getLeft().getRowType().getFieldList().size()) * fieldWidth;

double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST;
double memCost = 0;
double memCost = !excludeCorrelateColumn ? CORRELATE_MEM_COPY_COST : 0.0;
return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost);
}

@Override
protected RelDataType deriveRowType() {
switch (joinType) {
case LEFT:
case INNER:
return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
right.getRowType(), joinType.toJoinType(),
getCluster().getTypeFactory(), null,
ImmutableList.<RelDataTypeField>of()));
case ANTI:
case SEMI:
return constructRowType(left.getRowType());
default:
throw new IllegalStateException("Unknown join type " + joinType);
}
}

public int getInputSize(int offset, RelNode input) {
if (this.excludeCorrelateColumn &&
offset == 0) {
return input.getRowType().getFieldList().size() - 1;
}
return input.getRowType().getFieldList().size();
}

public RelDataType constructRowType(RelDataType inputRowType) {
Preconditions.checkArgument(this.requiredColumns.cardinality() == 1);

List<RelDataType> fields = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();
if (excludeCorrelateColumn) {
int corrVariable = this.requiredColumns.nextSetBit(0);

for (RelDataTypeField field : inputRowType.getFieldList()) {
if (field.getIndex() == corrVariable) {
continue;
}
fieldNames.add(field.getName());
fields.add(field.getType());
}

return getCluster().getTypeFactory().createStructType(fields, fieldNames);
}
return inputRowType;
}
}
@@ -18,9 +18,12 @@
package org.apache.drill.exec.planner.common;

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
@@ -29,6 +32,7 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
@@ -282,4 +286,70 @@ public Void visitCall(RexCall call) {
}
return false;
}

/**
* InputRefVisitor is a utility class used to collect all the RexInputRef nodes in a
* RexNode.
*
*/
public static class InputRefVisitor extends RexVisitorImpl<Void> {
private final List<RexInputRef> inputRefList;

public InputRefVisitor() {
super(true);
inputRefList = new ArrayList<>();
}

public Void visitInputRef(RexInputRef ref) {
inputRefList.add(ref);
return null;
}

public Void visitCall(RexCall call) {
for (RexNode operand : call.operands) {
operand.accept(this);
}
return null;
}

public List<RexInputRef> getInputRefs() {
return inputRefList;
}
}


/**
* RexFieldsTransformer is a utility class used to convert column refs in a RexNode
* based on inputRefMap (input to output ref map).
*
* This transformer can be used to find and replace the existing inputRef in a RexNode with a new inputRef.
*/
public static class RexFieldsTransformer {
private final RexBuilder rexBuilder;
private final Map<Integer, Integer> inputRefMap;

public RexFieldsTransformer(
RexBuilder rexBuilder,
Map<Integer, Integer> inputRefMap) {
this.rexBuilder = rexBuilder;
this.inputRefMap = inputRefMap;
}

public RexNode go(RexNode rex) {
if (rex instanceof RexCall) {
ImmutableList.Builder<RexNode> builder = ImmutableList.builder();
final RexCall call = (RexCall) rex;
for (RexNode operand : call.operands) {
builder.add(go(operand));
}
return call.clone(call.getType(), builder.build());
} else if (rex instanceof RexInputRef) {
RexInputRef var = (RexInputRef) rex;
int index = var.getIndex();
return rexBuilder.makeInputRef(var.getType(), inputRefMap.get(index));
} else {
return rex;
}
}
}
}
@@ -46,7 +46,7 @@ public void onMatch(RelOptRuleCall call) {

final RelTraitSet traits = correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
DrillLateralJoinRel lateralJoinRel = new DrillLateralJoinRel(correlate.getCluster(),
traits, convertedLeft, convertedRight, correlate.getCorrelationId(),
traits, convertedLeft, convertedRight, false, correlate.getCorrelationId(),
correlate.getRequiredColumns(), correlate.getJoinType());
call.transformTo(lateralJoinRel);
}
@@ -33,24 +33,24 @@

public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements DrillRel {

protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean includeCorrelateVar,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
super(cluster, traits, left, right, includeCorrelateVar, correlationId, requiredColumns, semiJoinType);
}

@Override
public Correlate copy(RelTraitSet traitSet,
RelNode left, RelNode right, CorrelationId correlationId,
ImmutableBitSet requiredColumns, SemiJoinType joinType) {
return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns,
this.getJoinType());
}

@Override
public LogicalOperator implement(DrillImplementor implementor) {
final List<String> fields = getRowType().getFieldNames();
assert DrillJoinRel.isUnique(fields);
final int leftCount = left.getRowType().getFieldCount();
final int leftCount = getInputSize(0,left);

final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this);
final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this);
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.logical;


import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Correlate;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.rules.ProjectCorrelateTransposeRule;
import org.apache.calcite.rel.rules.PushProjector;
import org.apache.calcite.tools.RelBuilderFactory;

public class DrillProjectLateralJoinTransposeRule extends ProjectCorrelateTransposeRule {

public static final DrillProjectLateralJoinTransposeRule INSTANCE = new DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition.TRUE, RelFactories.LOGICAL_BUILDER);

public DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
super(preserveExprCondition, relFactory);
}

@Override
public boolean matches(RelOptRuleCall call) {
Correlate correlate = call.rel(1);


// No need to call ProjectCorrelateTransposeRule if the current lateralJoin contains excludeCorrelationColumn set to true.
// This is needed as the project push into Lateral join rule changes the output row type which will fail assertions in ProjectCorrelateTransposeRule.
if (correlate instanceof DrillLateralJoinRel &&
((DrillLateralJoinRel)correlate).excludeCorrelateColumn) {
return false;
}

return true;
}
}

0 comments on commit 8ec2dc6

Please sign in to comment.