Skip to content

Commit

Permalink
DRILL-6617: Planner Side changed to propagate $drill_implicit_field$ …
Browse files Browse the repository at this point in the history
…information.
  • Loading branch information
HanumathRao authored and Sorabh Hamirwasia committed Jul 31, 2018
1 parent 1058bd9 commit e1f4a63
Show file tree
Hide file tree
Showing 37 changed files with 429 additions and 84 deletions.
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;


import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.adapter.jdbc.JdbcImplementor; import org.apache.calcite.adapter.jdbc.JdbcImplementor;
Expand Down Expand Up @@ -130,4 +131,10 @@ public SelectionVectorMode getEncoding() {
public boolean needsFinalColumnReordering() { public boolean needsFinalColumnReordering() {
return false; return false;
} }

@Override
public Prel addImplicitRowIDCol(List<RelNode> children) {
throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " +
this.getClass().getSimpleName() + " operator ");
}
} }
Expand Up @@ -51,6 +51,7 @@
import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.UnnestPOP; import org.apache.drill.exec.physical.config.UnnestPOP;
import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin; import org.apache.drill.exec.store.StoragePlugin;
Expand Down Expand Up @@ -235,7 +236,7 @@ public PhysicalOperator visitFilter(final Filter filter, final Object obj) throw


@Override @Override
public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) { public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) {
return new UnnestPOP(null, unnest.getColumn()); return new UnnestPOP(null, unnest.getColumn(), DrillUnnestRelBase.IMPLICIT_COLUMN);
} }
} }
} }
Expand Up @@ -38,6 +38,9 @@ public class LateralJoinPOP extends AbstractJoinPop {
@JsonProperty("excludedColumns") @JsonProperty("excludedColumns")
private List<SchemaPath> excludedColumns; private List<SchemaPath> excludedColumns;


@JsonProperty("implicitColumn")
private String implicitColumn;

@JsonProperty("unnestForLateralJoin") @JsonProperty("unnestForLateralJoin")
private UnnestPOP unnestForLateralJoin; private UnnestPOP unnestForLateralJoin;


Expand All @@ -46,20 +49,22 @@ public LateralJoinPOP(
@JsonProperty("left") PhysicalOperator left, @JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right, @JsonProperty("right") PhysicalOperator right,
@JsonProperty("joinType") JoinRelType joinType, @JsonProperty("joinType") JoinRelType joinType,
@JsonProperty("implicitColumn") String implicitColumn,
@JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) { @JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
super(left, right, joinType, null, null); super(left, right, joinType, null, null);
Preconditions.checkArgument(joinType != JoinRelType.FULL, Preconditions.checkArgument(joinType != JoinRelType.FULL,
"Full outer join is currently not supported with Lateral Join"); "Full outer join is currently not supported with Lateral Join");
Preconditions.checkArgument(joinType != JoinRelType.RIGHT, Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
"Right join is currently not supported with Lateral Join"); "Right join is currently not supported with Lateral Join");
this.excludedColumns = excludedColumns; this.excludedColumns = excludedColumns;
this.implicitColumn = implicitColumn;
} }


@Override @Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.size() == 2, Preconditions.checkArgument(children.size() == 2,
"Lateral join should have two physical operators"); "Lateral join should have two physical operators");
LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns); LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType, this.implicitColumn, this.excludedColumns);
newPOP.unnestForLateralJoin = this.unnestForLateralJoin; newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
return newPOP; return newPOP;
} }
Expand All @@ -78,6 +83,9 @@ public void setUnnestForLateralJoin(UnnestPOP unnest) {
this.unnestForLateralJoin = unnest; this.unnestForLateralJoin = unnest;
} }


@JsonProperty("implicitColumn")
public String getImplicitColumn() { return this.implicitColumn; }

@Override @Override
public int getOperatorType() { public int getOperatorType() {
return CoreOperatorType.LATERAL_JOIN_VALUE; return CoreOperatorType.LATERAL_JOIN_VALUE;
Expand Down
Expand Up @@ -39,6 +39,9 @@
public class UnnestPOP extends AbstractBase implements Leaf { public class UnnestPOP extends AbstractBase implements Leaf {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestPOP.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestPOP.class);


@JsonProperty("implicitColumn")
private String implicitColumn;

private SchemaPath column; private SchemaPath column;


@JsonIgnore @JsonIgnore
Expand All @@ -47,14 +50,16 @@ public class UnnestPOP extends AbstractBase implements Leaf {
@JsonCreator @JsonCreator
public UnnestPOP( public UnnestPOP(
@JsonProperty("child") PhysicalOperator child, // Operator with incoming record batch @JsonProperty("child") PhysicalOperator child, // Operator with incoming record batch
@JsonProperty("column") SchemaPath column) { @JsonProperty("column") SchemaPath column,
@JsonProperty("implicitColumn") String implicitColumn) {
this.column = column; this.column = column;
this.implicitColumn = implicitColumn;
} }


@Override @Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
assert children.isEmpty(); assert children.isEmpty();
UnnestPOP newUnnest = new UnnestPOP(null, column); UnnestPOP newUnnest = new UnnestPOP(null, column, this.implicitColumn);
newUnnest.addUnnestBatch(this.unnestBatch); newUnnest.addUnnestBatch(this.unnestBatch);
return newUnnest; return newUnnest;
} }
Expand Down Expand Up @@ -82,6 +87,9 @@ public UnnestRecordBatch getUnnestBatch() {
return this.unnestBatch; return this.unnestBatch;
} }


@JsonProperty("implicitColumn")
public String getImplicitColumn() { return this.implicitColumn; }

@Override @Override
public int getOperatorType() { public int getOperatorType() {
return UNNEST_VALUE; return UNNEST_VALUE;
Expand Down
Expand Up @@ -42,6 +42,8 @@


public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode { public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode {


final public static String IMPLICIT_COLUMN = DrillRelOptUtil.IMPLICIT_COLUMN;

final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST; final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST;
final public boolean excludeCorrelateColumn; final public boolean excludeCorrelateColumn;
public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol, public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
Expand Down Expand Up @@ -71,7 +73,7 @@ protected RelDataType deriveRowType() {
case LEFT: case LEFT:
case INNER: case INNER:
return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(), return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
right.getRowType(), joinType.toJoinType(), removeImplicitField(right.getRowType()), joinType.toJoinType(),
getCluster().getTypeFactory(), null, getCluster().getTypeFactory(), null,
ImmutableList.of())); ImmutableList.of()));
case ANTI: case ANTI:
Expand Down Expand Up @@ -118,6 +120,21 @@ public RelDataType constructRowType(RelDataType inputRowType) {
return inputRowType; return inputRowType;
} }


public RelDataType removeImplicitField(RelDataType inputRowType) {
List<RelDataType> fields = new ArrayList<>();
List<String> fieldNames = new ArrayList<>();

for (RelDataTypeField field : inputRowType.getFieldList()) {
if (field.getName().equals(IMPLICIT_COLUMN)) {
continue;
}
fieldNames.add(field.getName());
fields.add(field.getType());
}

return getCluster().getTypeFactory().createStructType(fields, fieldNames);
}

@Override @Override
public double estimateRowCount(RelMetadataQuery mq) { public double estimateRowCount(RelMetadataQuery mq) {
return mq.getRowCount(left); return mq.getRowCount(left);
Expand Down
Expand Up @@ -55,6 +55,8 @@
*/ */
public abstract class DrillRelOptUtil { public abstract class DrillRelOptUtil {


final public static String IMPLICIT_COLUMN = "$drill_implicit_field$";

// Similar to RelOptUtil.areRowTypesEqual() with the additional check for allowSubstring // Similar to RelOptUtil.areRowTypesEqual() with the additional check for allowSubstring
public static boolean areRowTypesCompatible( public static boolean areRowTypesCompatible(
RelDataType rowType1, RelDataType rowType1,
Expand Down Expand Up @@ -317,6 +319,33 @@ public List<RexInputRef> getInputRefs() {
} }
} }


/**
* Given a list of rexnodes it transforms the rexnodes by changing the expr to use new index mapped to the old index.
* @param builder : RexBuilder from the planner.
* @param exprs: RexNodes to be transformed.
* @param corrMap: Mapping between old index to new index.
* @return
*/
public static List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) {
List<RexNode> outputExprs = new ArrayList<>();
DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
for (RexNode expr : exprs) {
outputExprs.add(transformer.go(expr));
}
return outputExprs;
}

/**
* Given a of rexnode it transforms the rexnode by changing the expr to use new index mapped to the old index.
* @param builder : RexBuilder from the planner.
* @param expr: RexNode to be transformed.
* @param corrMap: Mapping between old index to new index.
* @return
*/
public static RexNode transformExpr(RexBuilder builder, RexNode expr, Map<Integer, Integer> corrMap) {
DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
return transformer.go(expr);
}


/** /**
* RexFieldsTransformer is a utility class used to convert column refs in a RexNode * RexFieldsTransformer is a utility class used to convert column refs in a RexNode
Expand Down
Expand Up @@ -29,6 +29,7 @@
public class DrillUnnestRelBase extends AbstractRelNode implements DrillRelNode { public class DrillUnnestRelBase extends AbstractRelNode implements DrillRelNode {


final protected RexNode ref; final protected RexNode ref;
final public static String IMPLICIT_COLUMN = DrillRelOptUtil.IMPLICIT_COLUMN;


public DrillUnnestRelBase(RelOptCluster cluster, RelTraitSet traitSet, RexNode ref) { public DrillUnnestRelBase(RelOptCluster cluster, RelTraitSet traitSet, RexNode ref) {
super(cluster, traitSet); super(cluster, traitSet);
Expand Down
Expand Up @@ -22,14 +22,12 @@
import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilderFactory; import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.common.DrillRelOptUtil;


import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -80,7 +78,7 @@ public void onMatch(RelOptRuleCall call) {


if (!DrillRelOptUtil.isTrivialProject(origProj, true)) { if (!DrillRelOptUtil.isTrivialProject(origProj, true)) {
Map<Integer, Integer> mapWithoutCorr = buildMapWithoutCorrColumn(corr, correlationIndex); Map<Integer, Integer> mapWithoutCorr = buildMapWithoutCorrColumn(corr, correlationIndex);
List<RexNode> outputExprs = transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr); List<RexNode> outputExprs = DrillRelOptUtil.transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr);


relNode = new DrillProjectRel(origProj.getCluster(), relNode = new DrillProjectRel(origProj.getCluster(),
left.getTraitSet().plus(DrillRel.DRILL_LOGICAL), left.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
Expand All @@ -89,15 +87,6 @@ public void onMatch(RelOptRuleCall call) {
call.transformTo(relNode); call.transformTo(relNode);
} }


private List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) {
List<RexNode> outputExprs = new ArrayList<>();
DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
for (RexNode expr : exprs) {
outputExprs.add(transformer.go(expr));
}
return outputExprs;
}

private Map<Integer, Integer> buildMapWithoutCorrColumn(RelNode corr, int correlationIndex) { private Map<Integer, Integer> buildMapWithoutCorrColumn(RelNode corr, int correlationIndex) {
int index = 0; int index = 0;
Map<Integer, Integer> result = new HashMap(); Map<Integer, Integer> result = new HashMap();
Expand Down
Expand Up @@ -188,4 +188,26 @@ public boolean needsFinalColumnReordering() {
return true; return true;
} }


@Override
public Prel addImplicitRowIDCol(List<RelNode> children) {
List<Integer> groupingCols = Lists.newArrayList();
groupingCols.add(0);
for (int groupingCol : groupSet.asList()) {
groupingCols.add(groupingCol + 1);
}

ImmutableBitSet groupingSet = ImmutableBitSet.of(groupingCols);
List<ImmutableBitSet> groupingSets = Lists.newArrayList();
groupingSets.add(groupingSet);
List<AggregateCall> aggregateCalls = Lists.newArrayList();
for (AggregateCall aggCall : aggCalls) {
List<Integer> arglist = Lists.newArrayList();
for (int arg : aggCall.getArgList()) {
arglist.add(arg + 1);
}
aggregateCalls.add(AggregateCall.create(aggCall.getAggregation(), aggCall.isDistinct(),
aggCall.isApproximate(), arglist, aggCall.filterArg, aggCall.type, aggCall.name));
}
return (Prel) copy(traitSet, children.get(0),indicator,groupingSet,groupingSets, aggregateCalls);
}
} }
Expand Up @@ -71,4 +71,9 @@ public boolean needsFinalColumnReordering() {
return true; return true;
} }


@Override
public Prel addImplicitRowIDCol(List<RelNode> children) {
throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " +
this.getClass().getSimpleName() + " operator ");
}
} }
Expand Up @@ -18,11 +18,16 @@
package org.apache.drill.exec.planner.physical; package org.apache.drill.exec.planner.physical;


import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map;


import org.apache.calcite.rex.RexBuilder;
import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.planner.common.DrillFilterRelBase; import org.apache.drill.exec.planner.common.DrillFilterRelBase;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
Expand Down Expand Up @@ -81,4 +86,17 @@ public boolean needsFinalColumnReordering() {
return true; return true;
} }


@Override
public Prel addImplicitRowIDCol(List<RelNode> children) {
RexBuilder builder = this.getCluster().getRexBuilder();
return (Prel) this.copy(this.traitSet, children.get(0), DrillRelOptUtil.transformExpr(builder, condition, buildMap()));
}

private Map<Integer, Integer> buildMap() {
Map<Integer, Integer> map = new HashMap<>();
for (int i=0;i<this.getInput().getRowType().getFieldCount();i++) {
map.put(i, i+1);
}
return map;
}
} }
Expand Up @@ -83,5 +83,4 @@ public SelectionVectorMode[] getSupportedEncodings() {
public SelectionVectorMode getEncoding() { public SelectionVectorMode getEncoding() {
return SelectionVectorMode.NONE; return SelectionVectorMode.NONE;
} }

} }
Expand Up @@ -138,4 +138,9 @@ protected void buildJoinConditions(List<JoinCondition> conditions,
FieldReference.getWithQuotedRef(rightFields.get(pair.right)))); FieldReference.getWithQuotedRef(rightFields.get(pair.right))));
} }
} }

@Override
public Prel addImplicitRowIDCol(List<RelNode> children) {
throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " + this.getClass().getSimpleName() + " operator ");
}
} }
Expand Up @@ -72,7 +72,7 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
if (getColumn() != null) { if (getColumn() != null) {
excludedColumns.add(getColumn()); excludedColumns.add(getColumn());
} }
LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), excludedColumns); LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), DrillLateralJoinRelBase.IMPLICIT_COLUMN, excludedColumns);
return creator.addMetadata(this, ljoin); return creator.addMetadata(this, ljoin);
} }


Expand Down Expand Up @@ -156,4 +156,9 @@ public BatchSchema.SelectionVectorMode getEncoding() {
return BatchSchema.SelectionVectorMode.NONE; return BatchSchema.SelectionVectorMode.NONE;
} }


@Override
public Prel addImplicitRowIDCol(List<RelNode> children) {
throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " +
this.getClass().getSimpleName() + " operator ");
}
} }
Expand Up @@ -89,27 +89,8 @@ public boolean needsFinalColumnReordering() {
return true; return true;
} }


// @Override @Override
// public LogicalOperator implement(DrillImplementor implementor) { public Prel addImplicitRowIDCol(List<RelNode> children) {
// LogicalOperator inputOp = implementor.visitChild(this, 0, getInput()); return (Prel) this.copy(this.traitSet, children);
// }
// // First offset to include into results (inclusive). Null implies it is starting from offset 0
// int first = offset != null ? Math.max(0, RexLiteral.intValue(offset)) : 0;
//
// // Last offset to stop including into results (exclusive), translating fetch row counts into an offset.
// // Null value implies including entire remaining result set from first offset
// Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
// Limit limit = new Limit(first, last);
// limit.setInput(inputOp);
// return limit;
// }

// public static LimitPrel convert(Limit limit, ConversionContext context) throws InvalidRelException{
// RelNode input = context.toRel(limit.getInput());
// RexNode first = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getFirst()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
// RexNode last = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getLast()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
// return new LimitPrel(context.getCluster(), context.getLogicalTraits(), input, first, last);
// }


} }

0 comments on commit e1f4a63

Please sign in to comment.