Skip to content

Commit

Permalink
DRILL-3623: For limit 0 queries, optionally use a shorter execution p…
Browse files Browse the repository at this point in the history
…ath when result column types are known

+ "planner.enable_limit0_optimization" option is disabled by default

+ Print plan in PlanTestBase if TEST_QUERY_PRINTING_SILENT is set
+ Fix DrillTestWrapper to verify expected and actual schema
+ Correct the schema of results in TestInbuiltHiveUDFs#testXpath_Double

This closes #405
  • Loading branch information
Sudheesh Katkam committed Mar 22, 2016
1 parent 600ba9e commit 5dbaafb
Show file tree
Hide file tree
Showing 13 changed files with 963 additions and 30 deletions.
Expand Up @@ -58,7 +58,7 @@ public void testXpath_Double() throws Exception {

final TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
.setMinorType(TypeProtos.MinorType.FLOAT8)
.setMode(TypeProtos.DataMode.REQUIRED)
.setMode(TypeProtos.DataMode.OPTIONAL)
.build();

final List<Pair<SchemaPath, TypeProtos.MajorType>> expectedSchema = Lists.newArrayList();
Expand Down
Expand Up @@ -202,6 +202,9 @@ public interface ExecConstants {
String AFFINITY_FACTOR_KEY = "planner.affinity_factor";
OptionValidator AFFINITY_FACTOR = new DoubleValidator(AFFINITY_FACTOR_KEY, 1.2d);

String EARLY_LIMIT0_OPT_KEY = "planner.enable_limit0_optimization";
BooleanValidator EARLY_LIMIT0_OPT = new BooleanValidator(EARLY_LIMIT0_OPT_KEY, false);

String ENABLE_MEMORY_ESTIMATION_KEY = "planner.memory.enable_memory_estimation";
OptionValidator ENABLE_MEMORY_ESTIMATION = new BooleanValidator(ENABLE_MEMORY_ESTIMATION_KEY, false);

Expand Down
Expand Up @@ -17,13 +17,13 @@
*/
package org.apache.drill.exec.physical.base;


public class ScanStats {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanStats.class);

// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanStats.class);

public static final ScanStats TRIVIAL_TABLE = new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 20, 1, 1);

public static final ScanStats ZERO_RECORD_TABLE = new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, 0, 1, 1);

private final long recordCount;
private final float cpuCost;
private final float diskCost;
Expand Down
Expand Up @@ -65,6 +65,7 @@
import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule;
import org.apache.drill.exec.planner.logical.partition.PruneScanRule;
import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan;
import org.apache.drill.exec.planner.physical.DirectScanPrule;
import org.apache.drill.exec.planner.physical.FilterPrule;
import org.apache.drill.exec.planner.physical.HashAggPrule;
import org.apache.drill.exec.planner.physical.HashJoinPrule;
Expand Down Expand Up @@ -391,6 +392,7 @@ static final RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContex
ruleList.add(LimitUnionExchangeTransposeRule.INSTANCE);
ruleList.add(UnionAllPrule.INSTANCE);
ruleList.add(ValuesPrule.INSTANCE);
ruleList.add(DirectScanPrule.INSTANCE);

if (ps.isHashAggEnabled()) {
ruleList.add(HashAggPrule.INSTANCE);
Expand Down
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.logical;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.store.direct.DirectGroupScan;

/**
* Logical RelNode representing a {@link DirectGroupScan}. This is not backed by a {@link DrillTable},
* unlike {@link DrillScanRel}.
*/
public class DrillDirectScanRel extends AbstractRelNode implements DrillRel {

private final DirectGroupScan groupScan;
private final RelDataType rowType;

public DrillDirectScanRel(RelOptCluster cluster, RelTraitSet traitSet, DirectGroupScan directGroupScan,
RelDataType rowType) {
super(cluster, traitSet);
this.groupScan = directGroupScan;
this.rowType = rowType;
}

@Override
public LogicalOperator implement(DrillImplementor implementor) {
return null;
}

@Override
public RelDataType deriveRowType() {
return this.rowType;
}

@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw).item("directscan", groupScan.getDigest());
}

@Override
public double getRows() {
final PlannerSettings settings = PrelUtil.getPlannerSettings(getCluster());
return groupScan.getScanStats(settings).getRecordCount();
}

public DirectGroupScan getGroupScan() {
return groupScan;
}
}
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.physical;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;

public class DirectScanPrule extends Prule {

public static final RelOptRule INSTANCE = new DirectScanPrule();

public DirectScanPrule() {
super(RelOptHelper.any(DrillDirectScanRel.class), "Prel.DirectScanPrule");
}

@Override
public void onMatch(RelOptRuleCall call) {
final DrillDirectScanRel scan = call.rel(0);
final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL);

final ScanPrel newScan = new ScanPrel(scan.getCluster(), traits, scan.getGroupScan(), scan.getRowType()) {
// direct scan (no execution) => no accidental column shuffling => no reordering
@Override
public boolean needsFinalColumnReordering() {
return false;
}
};

call.transformTo(newScan);
}
}
Expand Up @@ -206,6 +206,18 @@ protected ConvertedRelNode validateAndConvert(SqlNode sqlNode) throws ForemanSet
* @throws RelConversionException
*/
protected DrillRel convertToDrel(final RelNode relNode) throws SqlUnsupportedException, RelConversionException {
if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) &&
context.getPlannerSettings().isTypeInferenceEnabled() &&
FindLimit0Visitor.containsLimit0(relNode)) {
// disable distributed mode
context.getPlannerSettings().forceSingleMode();
// if the schema is known, return the schema directly
final DrillRel shorterPlan;
if ((shorterPlan = FindLimit0Visitor.getDirectScanRelIfFullySchemaed(relNode)) != null) {
return shorterPlan;
}
}

try {
final RelNode convertedRelNode;

Expand Down
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.drill.exec.planner.sql.handlers;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.logical.LogicalAggregate;
Expand All @@ -25,35 +29,104 @@
import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
import org.apache.drill.exec.planner.logical.DrillLimitRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.direct.DirectGroupScan;

import java.util.List;

/**
* Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we
* inform the planner settings that this plan should be run as a single node plan to reduce the overhead associated with
* executing a schema-only query.
*/
public class FindLimit0Visitor extends RelShuttleImpl {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FindLimit0Visitor.class);
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FindLimit0Visitor.class);

// Some types are excluded in this set:
// + DECIMAL type is not fully supported in general.
// + VARBINARY is not fully tested.
// + MAP, ARRAY are currently not exposed to the planner.
// + TINYINT, SMALLINT are defined in the Drill type system but have been turned off for now.
// + SYMBOL, MULTISET, DISTINCT, STRUCTURED, ROW, OTHER, CURSOR, COLUMN_LIST are Calcite types
// currently not supported by Drill, nor defined in the Drill type list.
// + ANY is the late binding type.
private static final ImmutableSet<SqlTypeName> TYPES =
ImmutableSet.<SqlTypeName>builder()
.add(SqlTypeName.INTEGER, SqlTypeName.BIGINT, SqlTypeName.FLOAT, SqlTypeName.DOUBLE,
SqlTypeName.VARCHAR, SqlTypeName.BOOLEAN, SqlTypeName.DATE, SqlTypeName.TIME,
SqlTypeName.TIMESTAMP, SqlTypeName.INTERVAL_YEAR_MONTH, SqlTypeName.INTERVAL_DAY_TIME,
SqlTypeName.CHAR)
.build();

/**
* If all field types of the given node are {@link #TYPES recognized types} and honored by execution, then this
* method returns the tree: DrillDirectScanRel(field types). Otherwise, the method returns null.
*
* @param rel calcite logical rel tree
* @return drill logical rel tree
*/
public static DrillRel getDirectScanRelIfFullySchemaed(RelNode rel) {
final List<RelDataTypeField> fieldList = rel.getRowType().getFieldList();
final List<SqlTypeName> columnTypes = Lists.newArrayList();
final List<TypeProtos.DataMode> dataModes = Lists.newArrayList();

for (final RelDataTypeField field : fieldList) {
final SqlTypeName sqlTypeName = field.getType().getSqlTypeName();
if (!TYPES.contains(sqlTypeName)) {
return null;
} else {
columnTypes.add(sqlTypeName);
dataModes.add(field.getType().isNullable() ?
TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED);
}
}

private boolean contains = false;
final RelTraitSet traits = rel.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
final RelDataTypeReader reader = new RelDataTypeReader(rel.getRowType().getFieldNames(), columnTypes,
dataModes);
return new DrillDirectScanRel(rel.getCluster(), traits,
new DirectGroupScan(reader, ScanStats.ZERO_RECORD_TABLE), rel.getRowType());
}

/**
* Check if the root portion of the tree contains LIMIT(0).
*
* @param rel rel node tree
* @return true if the root portion of the tree contains LIMIT(0)
*/
public static boolean containsLimit0(RelNode rel) {
FindLimit0Visitor visitor = new FindLimit0Visitor();
rel.accept(visitor);
return visitor.isContains();
}

private boolean contains = false;

private FindLimit0Visitor() {
}

boolean isContains() {
return contains;
}

private boolean isLimit0(RexNode fetch) {
private static boolean isLimit0(RexNode fetch) {
if (fetch != null && fetch.isA(SqlKind.LITERAL)) {
RexLiteral l = (RexLiteral) fetch;
switch (l.getTypeName()) {
Expand Down Expand Up @@ -116,4 +189,49 @@ public RelNode visit(LogicalMinus minus) {
public RelNode visit(LogicalUnion union) {
return union;
}

/**
* Reader for column names and types.
*/
public static class RelDataTypeReader extends AbstractRecordReader {

public final List<String> columnNames;
public final List<SqlTypeName> columnTypes;
public final List<TypeProtos.DataMode> dataModes;

public RelDataTypeReader(List<String> columnNames, List<SqlTypeName> columnTypes,
List<TypeProtos.DataMode> dataModes) {
Preconditions.checkArgument(columnNames.size() == columnTypes.size() &&
columnTypes.size() == dataModes.size());
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.dataModes = dataModes;
}

@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
for (int i = 0; i < columnNames.size(); i++) {
final TypeProtos.MajorType type = TypeProtos.MajorType.newBuilder()
.setMode(dataModes.get(i))
.setMinorType(TypeInferenceUtils.getDrillTypeFromCalciteType(columnTypes.get(i)))
.build();
final MaterializedField field = MaterializedField.create(columnNames.get(i), type);
final Class vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
try {
output.addField(field, vvClass);
} catch (SchemaChangeException e) {
throw new ExecutionSetupException(e);
}
}
}

@Override
public int next() {
return 0;
}

@Override
public void close() throws Exception {
}
}
}
Expand Up @@ -116,6 +116,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.SMALL_QUEUE_SIZE,
ExecConstants.MIN_HASH_TABLE_SIZE,
ExecConstants.MAX_HASH_TABLE_SIZE,
ExecConstants.EARLY_LIMIT0_OPT,
ExecConstants.ENABLE_MEMORY_ESTIMATION,
ExecConstants.MAX_QUERY_MEMORY_PER_NODE,
ExecConstants.NON_BLOCKING_OPERATORS_MEMORY,
Expand Down

0 comments on commit 5dbaafb

Please sign in to comment.