Skip to content
Permalink
Browse files
DRILL-6502: Rename CorrelatePrel to LateralJoinPrel.
closes #1325
  • Loading branch information
HanumathRao authored and vdiravka committed Jun 22, 2018
1 parent 9299fcc commit 9c7e55798259ff4ac1aca102539731348e2c9bd3
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 69 deletions.
@@ -55,7 +55,7 @@
import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule;
import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan;
import org.apache.drill.exec.planner.physical.CorrelatePrule;
import org.apache.drill.exec.planner.physical.LateralJoinPrule;
import org.apache.drill.exec.planner.physical.DirectScanPrule;
import org.apache.drill.exec.planner.physical.FilterPrule;
import org.apache.drill.exec.planner.physical.HashAggPrule;
@@ -461,7 +461,7 @@ static RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContext) {
ruleList.add(DirectScanPrule.INSTANCE);

ruleList.add(UnnestPrule.INSTANCE);
ruleList.add(CorrelatePrule.INSTANCE);
ruleList.add(LateralJoinPrule.INSTANCE);

ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_PROJECT);
ruleList.add(DrillPushLimitToScanRule.LIMIT_ON_SCAN);
@@ -32,9 +32,9 @@
import org.apache.drill.exec.planner.physical.PrelUtil;


public abstract class DrillCorrelateRelBase extends Correlate implements DrillRelNode {
public DrillCorrelateRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode {
public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
}

@@ -33,7 +33,7 @@ public class DrillCorrelateRule extends RelOptRule {
private DrillCorrelateRule() {
super(RelOptHelper.any(LogicalCorrelate.class, Convention.NONE),
DrillRelFactories.LOGICAL_BUILDER,
"DrillCorrelateRule");
"DrillLateralJoinRule");
}

@Override
@@ -45,9 +45,9 @@ public void onMatch(RelOptRuleCall call) {
final RelNode convertedRight = convert(right, right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());

final RelTraitSet traits = correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
DrillCorrelateRel correlateRel = new DrillCorrelateRel(correlate.getCluster(),
DrillLateralJoinRel lateralJoinRel = new DrillLateralJoinRel(correlate.getCluster(),
traits, convertedLeft, convertedRight, correlate.getCorrelationId(),
correlate.getRequiredColumns(), correlate.getJoinType());
call.transformTo(correlateRel);
call.transformTo(lateralJoinRel);
}
}
@@ -26,23 +26,23 @@
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.common.logical.data.LateralJoin;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.exec.planner.common.DrillCorrelateRelBase;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;

import java.util.List;


public class DrillCorrelateRel extends DrillCorrelateRelBase implements DrillRel {
public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements DrillRel {

protected DrillCorrelateRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
}

@Override
public Correlate copy(RelTraitSet traitSet,
RelNode left, RelNode right, CorrelationId correlationId,
ImmutableBitSet requiredColumns, SemiJoinType joinType) {
return new DrillCorrelateRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
this.getJoinType());
}

@@ -32,7 +32,7 @@
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.planner.common.DrillCorrelateRelBase;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
import org.apache.drill.exec.planner.common.DrillJoinRelBase;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema;
@@ -41,18 +41,18 @@
import java.util.Iterator;
import java.util.List;

public class CorrelatePrel extends DrillCorrelateRelBase implements Prel {
public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {


protected CorrelatePrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
}
@Override
public Correlate copy(RelTraitSet traitSet,
RelNode left, RelNode right, CorrelationId correlationId,
ImmutableBitSet requiredColumns, SemiJoinType joinType) {
return new CorrelatePrel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
this.getJoinType());
}

@@ -72,7 +72,7 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
* Check to make sure that the fields of the inputs are the same as the output field names.
* If not, insert a project renaming them.
*/
public RelNode getCorrelateInput(int offset, RelNode input) {
public RelNode getLateralInput(int offset, RelNode input) {
Preconditions.checkArgument(DrillJoinRelBase.uniqueFieldNames(input.getRowType()));
final List<String> fields = getRowType().getFieldNames();
final List<String> inputFields = input.getRowType().getFieldNames();
@@ -106,7 +106,7 @@ private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, List<S

@Override
public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E {
return visitor.visitCorrelate(this, value);
return visitor.visitLateral(this, value);
}

@Override
@@ -22,22 +22,22 @@
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.drill.exec.planner.logical.DrillCorrelateRel;
import org.apache.drill.exec.planner.logical.DrillLateralJoinRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;

public class CorrelatePrule extends Prule {
public static final RelOptRule INSTANCE = new CorrelatePrule("Prel.CorrelatePrule",
RelOptHelper.any(DrillCorrelateRel.class));
public class LateralJoinPrule extends Prule {
public static final RelOptRule INSTANCE = new LateralJoinPrule("Prel.LateralJoinPrule",
RelOptHelper.any(DrillLateralJoinRel.class));

private CorrelatePrule(String name, RelOptRuleOperand operand) {
private LateralJoinPrule(String name, RelOptRuleOperand operand) {
super(operand, name);
}

@Override
public void onMatch(RelOptRuleCall call) {
final DrillCorrelateRel correlate = call.rel(0);
final RelNode left = correlate.getLeft();
final RelNode right = correlate.getRight();
final DrillLateralJoinRel lateralJoinRel = call.rel(0);
final RelNode left = lateralJoinRel.getLeft();
final RelNode right = lateralJoinRel.getRight();
RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
RelTraitSet traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL);

@@ -46,11 +46,10 @@ public void onMatch(RelOptRuleCall call) {
final RelNode convertedLeft = convert(left, traitsLeft);
final RelNode convertedRight = convert(right, traitsRight);

final CorrelatePrel correlatePrel = new CorrelatePrel(correlate.getCluster(),
final LateralJoinPrel lateralJoinPrel = new LateralJoinPrel(lateralJoinRel.getCluster(),
corrTraits,
convertedLeft, convertedRight, correlate.getCorrelationId(),
correlate.getRequiredColumns(),correlate.getJoinType());
call.transformTo(correlatePrel);
convertedLeft, convertedRight, lateralJoinRel.getCorrelationId(),
lateralJoinRel.getRequiredColumns(),lateralJoinRel.getJoinType());
call.transformTo(lateralJoinPrel);
}

}
@@ -76,6 +76,6 @@ public boolean needsFinalColumnReordering() {
}

public Class<?> getParentClass() {
return CorrelatePrel.class;
return LateralJoinPrel.class;
}
}
@@ -32,7 +32,7 @@
import org.apache.calcite.runtime.FlatLists;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.util.Pair;
import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;
import org.apache.drill.exec.planner.physical.HashJoinPrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
@@ -138,7 +138,7 @@ private String getDependentSrcOp(RelNode rel) {

private String getDependentSrcOp(UnnestPrel unnest) {
Prel parent = this.getRegisteredPrel(unnest.getParentClass());
if (parent != null && parent instanceof CorrelatePrel) {
if (parent != null && parent instanceof LateralJoinPrel) {
OpId id = ids.get(parent);
return String.format(" [srcOp=%02d-%02d] ", id.fragmentId, id.opId);
}
@@ -159,8 +159,8 @@ public void unRegister(Prel unregister) {


private void explainInputs(RelNode rel) {
if (rel instanceof CorrelatePrel) {
this.explainInputs((CorrelatePrel) rel);
if (rel instanceof LateralJoinPrel) {
this.explainInputs((LateralJoinPrel) rel);
} else {
List<RelNode> inputs = rel.getInputs();
if (rel instanceof HashJoinPrel && ((HashJoinPrel) rel).isSwapped()) {
@@ -173,13 +173,13 @@ private void explainInputs(RelNode rel) {
}
}

//Correlate is handled differently because explain plan
//Lateral is handled differently because explain plan
//needs to show relation between Lateral and Unnest operators.
private void explainInputs(CorrelatePrel correlate) {
correlate.getInput(0).explain(this);
this.register(correlate);
correlate.getInput(1).explain(this);
this.unRegister(correlate);
private void explainInputs(LateralJoinPrel lateralJoinPrel) {
lateralJoinPrel.getInput(0).explain(this);
this.register(lateralJoinPrel);
lateralJoinPrel.getInput(1).explain(this);
this.unRegister(lateralJoinPrel);
}

public final void explain(RelNode rel, List<Pair<String, Object>> valueList) {
@@ -25,7 +25,7 @@
import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;

public class BasePrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> implements PrelVisitor<RETURN, EXTRA, EXCEP> {

@@ -71,7 +71,7 @@ public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP {
}

@Override
public RETURN visitCorrelate(CorrelatePrel prel, EXTRA value) throws EXCEP {
public RETURN visitLateral(LateralJoinPrel prel, EXTRA value) throws EXCEP {
return visitPrel(prel, value);
}

@@ -20,7 +20,7 @@
import java.util.Collections;
import java.util.List;
import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ScanPrel;
@@ -32,7 +32,7 @@

public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, ExcessiveExchangeIdentifier.MajorFragmentStat, RuntimeException> {
private final long targetSliceSize;
private CorrelatePrel topMostLateralJoin = null;
private LateralJoinPrel topMostLateralJoin = null;

public ExcessiveExchangeIdentifier(long targetSliceSize) {
this.targetSliceSize = targetSliceSize;
@@ -83,7 +83,7 @@ public Prel visitScan(ScanPrel prel, MajorFragmentStat s) throws RuntimeExceptio
}

@Override
public Prel visitCorrelate(CorrelatePrel prel, MajorFragmentStat s) throws RuntimeException {
public Prel visitLateral(LateralJoinPrel prel, MajorFragmentStat s) throws RuntimeException {
List<RelNode> children = Lists.newArrayList();
s.add(prel);

@@ -20,7 +20,7 @@
import java.util.List;

import org.apache.drill.exec.planner.physical.JoinPrel;
import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.calcite.rel.RelNode;

@@ -72,16 +72,16 @@ public Prel visitJoin(JoinPrel prel, Void value) throws RuntimeException {

//TODO: consolidate this code with join column renaming.
@Override
public Prel visitCorrelate(CorrelatePrel prel, Void value) throws RuntimeException {
public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeException {

List<RelNode> children = getChildren(prel);

final int leftCount = children.get(0).getRowType().getFieldCount();

List<RelNode> reNamedChildren = Lists.newArrayList();

RelNode left = prel.getCorrelateInput(0, children.get(0));
RelNode right = prel.getCorrelateInput(leftCount, children.get(1));
RelNode left = prel.getLateralInput(0, children.get(0));
RelNode right = prel.getLateralInput(leftCount, children.get(1));

reNamedChildren.add(left);
reNamedChildren.add(right);
@@ -25,20 +25,20 @@
import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;


public interface PrelVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PrelVisitor.class);
org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PrelVisitor.class);

public RETURN visitExchange(ExchangePrel prel, EXTRA value) throws EXCEP;
public RETURN visitScreen(ScreenPrel prel, EXTRA value) throws EXCEP;
public RETURN visitWriter(WriterPrel prel, EXTRA value) throws EXCEP;
public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
public RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;
public RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP;
public RETURN visitCorrelate(CorrelatePrel prel, EXTRA value) throws EXCEP;
RETURN visitExchange(ExchangePrel prel, EXTRA value) throws EXCEP;
RETURN visitScreen(ScreenPrel prel, EXTRA value) throws EXCEP;
RETURN visitWriter(WriterPrel prel, EXTRA value) throws EXCEP;
RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;
RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP;
RETURN visitLateral(LateralJoinPrel prel, EXTRA value) throws EXCEP;

}
@@ -25,7 +25,7 @@
import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.drill.exec.planner.physical.CorrelatePrel;
import org.apache.drill.exec.planner.physical.LateralJoinPrel;

/**
* Debug-time class that prints a PRel tree to the console for
@@ -234,7 +234,7 @@ public Void visitUnnest(UnnestPrel prel, VisualizationState value) throws Except
}

@Override
public Void visitCorrelate(CorrelatePrel prel, VisualizationState value) throws Exception {
public Void visitLateral(LateralJoinPrel prel, VisualizationState value) throws Exception {
visitPrel(prel, value);
return null;
}
@@ -79,7 +79,7 @@ public void testFilterPushCorrelate() throws Exception {
String query = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t,"
+ " unnest(t.orders) t2(ord) where t.c_name='customer1' AND t2.ord.o_shop='Meno Park 1st' ";

PlanTestBase.testPlanMatchingPatterns(query, new String[]{"Correlate(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"},
PlanTestBase.testPlanMatchingPatterns(query, new String[]{"LateralJoin(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"},
new String[]{});

testBuilder()
@@ -430,7 +430,7 @@ public void testNoExchangeWithLateralsDownStreamAgg() throws Exception {
}

private String getRightChildOfLateral(String explain) throws Exception {
Matcher matcher = Pattern.compile("Correlate.*Unnest", Pattern.MULTILINE | Pattern.DOTALL).matcher(explain);
Matcher matcher = Pattern.compile("LateralJoin.*Unnest", Pattern.MULTILINE | Pattern.DOTALL).matcher(explain);
assertTrue (matcher.find());
String CorrelateUnnest = matcher.group(0);
return CorrelateUnnest.substring(CorrelateUnnest.lastIndexOf("Scan"));
@@ -452,7 +452,7 @@ public void testLateralAndUnnestExplainPlan() throws Exception {
assertTrue(srcOp != null && srcOp.length() > 0);
String correlateFragmentPattern = srcOp.substring(srcOp.indexOf("=")+1, srcOp.indexOf("]"));
assertTrue(correlateFragmentPattern != null && correlateFragmentPattern.length() > 0);
Matcher matcher = Pattern.compile(correlateFragmentPattern + ".*Correlate", Pattern.MULTILINE | Pattern.DOTALL).matcher(explain);
Matcher matcher = Pattern.compile(correlateFragmentPattern + ".*LateralJoin", Pattern.MULTILINE | Pattern.DOTALL).matcher(explain);
assertTrue(matcher.find());
}
}

0 comments on commit 9c7e557

Please sign in to comment.