Skip to content

Commit

Permalink
DRILL-6118: Handle item star columns during project / filter push dow…
Browse files Browse the repository at this point in the history
…n and directory pruning

1. Added DrillFilterItemStarReWriterRule to re-write item star fields to regular field references.
2. Refactored DrillPushProjectIntoScanRule to handle item star fields, factored out helper classes and methods from PreUitl.class.
3. Fixed issue with dynamic star usage (after Calcite upgrade old usage of star was still present, replaced WILDCARD -> DYNAMIC_STAR  for clarity).
4. Added unit tests to check project / filter push down and directory pruning with item star.
  • Loading branch information
arina-ielchiieva authored and Aman Sinha committed Feb 25, 2018
1 parent 50efb80 commit 9073aed
Show file tree
Hide file tree
Showing 27 changed files with 923 additions and 413 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.apache.calcite.sql.SqlIntervalQualifier; import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFamily; import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl; import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
import org.apache.drill.exec.planner.types.RelDataTypeHolder; import org.apache.drill.exec.planner.types.RelDataTypeHolder;
Expand Down Expand Up @@ -72,9 +73,9 @@ public FieldType(
@JsonProperty("fractionalSecondPrecision") Integer fractionalSecondPrecision, @JsonProperty("fractionalSecondPrecision") Integer fractionalSecondPrecision,
@JsonProperty("isNullable") Boolean isNullable) { @JsonProperty("isNullable") Boolean isNullable) {
// Fix for views which were created on Calcite 1.4. // Fix for views which were created on Calcite 1.4.
// After Calcite upgrade star "*" was changed on dynamic star "**" // After Calcite upgrade star "*" was changed on dynamic star "**" (SchemaPath.DYNAMIC_STAR)
// and type of star was changed to SqlTypeName.DYNAMIC_STAR // and type of star was changed to SqlTypeName.DYNAMIC_STAR
this.name = "*".equals(name) ? "**" : name; this.name = "*".equals(name) ? SchemaPath.DYNAMIC_STAR : name;
this.type = "*".equals(name) && type == SqlTypeName.ANY ? SqlTypeName.DYNAMIC_STAR : type; this.type = "*".equals(name) && type == SqlTypeName.ANY ? SqlTypeName.DYNAMIC_STAR : type;
this.precision = precision; this.precision = precision;
this.scale = scale; this.scale = scale;
Expand Down
Expand Up @@ -300,7 +300,7 @@ private boolean isWildcard(final NamedExpression ex) {
return false; return false;
} }
final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); final NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
return expr.getPath().contains(SchemaPath.WILDCARD); return expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
} }


private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException { private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
Expand Down Expand Up @@ -542,7 +542,7 @@ private boolean isClassificationNeeded(final List<NamedExpression> exprs) {
final NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); final NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
final NameSegment ref = ex.getRef().getRootSegment(); final NameSegment ref = ex.getRef().getRootSegment();
final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
final boolean exprContainsStar = expr.getPath().contains(SchemaPath.WILDCARD); final boolean exprContainsStar = expr.getPath().contains(SchemaPath.DYNAMIC_STAR);


if (refHasPrefix || exprContainsStar) { if (refHasPrefix || exprContainsStar) {
needed = true; needed = true;
Expand Down Expand Up @@ -596,10 +596,10 @@ private void classifyExpr(final NamedExpression ex, final RecordBatch incoming,
final NameSegment ref = ex.getRef().getRootSegment(); final NameSegment ref = ex.getRef().getRootSegment();
final boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); final boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); final boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
final boolean exprIsStar = expr.getPath().equals(SchemaPath.WILDCARD); final boolean exprIsStar = expr.getPath().equals(SchemaPath.DYNAMIC_STAR);
final boolean refContainsStar = ref.getPath().contains(SchemaPath.WILDCARD); final boolean refContainsStar = ref.getPath().contains(SchemaPath.DYNAMIC_STAR);
final boolean exprContainsStar = expr.getPath().contains(SchemaPath.WILDCARD); final boolean exprContainsStar = expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
final boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.WILDCARD); final boolean refEndsWithStar = ref.getPath().endsWith(SchemaPath.DYNAMIC_STAR);


String exprPrefix = EMPTY_STRING; String exprPrefix = EMPTY_STRING;
String exprSuffix = expr.getPath(); String exprSuffix = expr.getPath();
Expand Down
@@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
Expand Down Expand Up @@ -36,7 +36,8 @@ public ScanBatch getBatch(ExecutorFragmentContext context, Values config, List<R
throws ExecutionSetupException { throws ExecutionSetupException {
assert children.isEmpty(); assert children.isEmpty();


JSONRecordReader reader = new JSONRecordReader(context, config.getContent().asNode(), null, Collections.singletonList(SchemaPath.getSimplePath("*"))); JSONRecordReader reader = new JSONRecordReader(context, config.getContent().asNode(),
null, Collections.singletonList(SchemaPath.STAR_COLUMN));
return new ScanBatch(config, context, Collections.singletonList((RecordReader) reader)); return new ScanBatch(config, context, Collections.singletonList((RecordReader) reader));
} }
} }
Expand Up @@ -38,9 +38,10 @@
import org.apache.drill.exec.planner.logical.DrillProjectRule; import org.apache.drill.exec.planner.logical.DrillProjectRule;
import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule; import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule;
import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule; import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule;
import org.apache.drill.exec.planner.logical.DrillPushProjIntoScan; import org.apache.drill.exec.planner.logical.DrillPushProjectIntoScanRule;
import org.apache.drill.exec.planner.logical.DrillPushProjectPastFilterRule; import org.apache.drill.exec.planner.logical.DrillPushProjectPastFilterRule;
import org.apache.drill.exec.planner.logical.DrillPushProjectPastJoinRule; import org.apache.drill.exec.planner.logical.DrillPushProjectPastJoinRule;
import org.apache.drill.exec.planner.logical.DrillFilterItemStarReWriterRule;
import org.apache.drill.exec.planner.logical.DrillReduceAggregatesRule; import org.apache.drill.exec.planner.logical.DrillReduceAggregatesRule;
import org.apache.drill.exec.planner.logical.DrillReduceExpressionsRule; import org.apache.drill.exec.planner.logical.DrillReduceExpressionsRule;
import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.DrillRelFactories;
Expand Down Expand Up @@ -276,7 +277,7 @@ static RuleSet getDrillUserConfigurableLogicalRules(OptimizerRulesContext optimi
// Due to infinite loop in planning (DRILL-3257), temporarily disable this rule // Due to infinite loop in planning (DRILL-3257), temporarily disable this rule
//DrillProjectSetOpTransposeRule.INSTANCE, //DrillProjectSetOpTransposeRule.INSTANCE,
RuleInstance.PROJECT_WINDOW_TRANSPOSE_RULE, RuleInstance.PROJECT_WINDOW_TRANSPOSE_RULE,
DrillPushProjIntoScan.INSTANCE, DrillPushProjectIntoScanRule.INSTANCE,


/* /*
Convert from Calcite Logical to Drill Logical Rules. Convert from Calcite Logical to Drill Logical Rules.
Expand Down Expand Up @@ -336,6 +337,7 @@ static RuleSet getDrillBasicRules(OptimizerRulesContext optimizerRulesContext) {
static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) { static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder() final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
.add( .add(
DrillFilterItemStarReWriterRule.INSTANCE,
PruneScanRule.getDirFilterOnProject(optimizerRulesContext), PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
PruneScanRule.getDirFilterOnScan(optimizerRulesContext), PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext), ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
Expand Down Expand Up @@ -376,6 +378,7 @@ static RuleSet getPhysicalPruneScanRules(OptimizerRulesContext optimizerRulesCon
static RuleSet getDirPruneScanRules(OptimizerRulesContext optimizerRulesContext) { static RuleSet getDirPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder() final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
.add( .add(
DrillFilterItemStarReWriterRule.INSTANCE,
PruneScanRule.getDirFilterOnProject(optimizerRulesContext), PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
PruneScanRule.getDirFilterOnScan(optimizerRulesContext) PruneScanRule.getDirFilterOnScan(optimizerRulesContext)
) )
Expand Down
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.apache.drill.exec.planner; package org.apache.drill.exec.planner;


import java.util.List; import java.util.List;
Expand All @@ -29,7 +28,7 @@ public class StarColumnHelper {


public final static String PREFIX_DELIMITER = "\u00a6\u00a6"; public final static String PREFIX_DELIMITER = "\u00a6\u00a6";


public final static String PREFIXED_STAR_COLUMN = PREFIX_DELIMITER + SchemaPath.WILDCARD; public final static String PREFIXED_STAR_COLUMN = PREFIX_DELIMITER + SchemaPath.DYNAMIC_STAR;


public static boolean containsStarColumn(RelDataType type) { public static boolean containsStarColumn(RelDataType type) {
if (! type.isStruct()) { if (! type.isStruct()) {
Expand All @@ -38,8 +37,8 @@ public static boolean containsStarColumn(RelDataType type) {


List<String> fieldNames = type.getFieldNames(); List<String> fieldNames = type.getFieldNames();


for (String s : fieldNames) { for (String fieldName : fieldNames) {
if (s.startsWith(SchemaPath.WILDCARD)) { if (SchemaPath.DYNAMIC_STAR.equals(fieldName)) {
return true; return true;
} }
} }
Expand All @@ -48,15 +47,15 @@ public static boolean containsStarColumn(RelDataType type) {
} }


public static boolean containsStarColumnInProject(RelDataType inputRowType, List<RexNode> projExprs) { public static boolean containsStarColumnInProject(RelDataType inputRowType, List<RexNode> projExprs) {
if (! inputRowType.isStruct()) { if (!inputRowType.isStruct()) {
return false; return false;
} }


for (RexNode expr : projExprs) { for (RexNode expr : projExprs) {
if (expr instanceof RexInputRef) { if (expr instanceof RexInputRef) {
String name = inputRowType.getFieldNames().get(((RexInputRef) expr).getIndex()); String name = inputRowType.getFieldNames().get(((RexInputRef) expr).getIndex());


if (name.startsWith(SchemaPath.WILDCARD)) { if (SchemaPath.DYNAMIC_STAR.equals(name)) {
return true; return true;
} }
} }
Expand All @@ -70,7 +69,7 @@ public static boolean isPrefixedStarColumn(String fieldName) {
} }


public static boolean isNonPrefixedStarColumn(String fieldName) { public static boolean isNonPrefixedStarColumn(String fieldName) {
return fieldName.startsWith(SchemaPath.WILDCARD); return SchemaPath.DYNAMIC_STAR.equals(fieldName);
} }


public static boolean isStarColumn(String fieldName) { public static boolean isStarColumn(String fieldName) {
Expand Down
@@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
Expand All @@ -18,12 +18,12 @@
package org.apache.drill.exec.planner.common; package org.apache.drill.exec.planner.common;


import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.plan.RelTraitSet;
import org.apache.drill.exec.util.Utilities;


/** /**
* Base class for logical scan rel implemented in Drill. * Base class for logical scan rel implemented in Drill.
Expand All @@ -35,11 +35,7 @@ public abstract class DrillScanRelBase extends TableScan implements DrillRelNode


public DrillScanRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traits, RelOptTable table) { public DrillScanRelBase(Convention convention, RelOptCluster cluster, RelTraitSet traits, RelOptTable table) {
super(cluster, traits, table); super(cluster, traits, table);
DrillTable unwrap = table.unwrap(DrillTable.class); this.drillTable = Utilities.getDrillTable(table);
if (unwrap == null) {
unwrap = table.unwrap(DrillTranslatableTable.class).getDrillTable();
}
this.drillTable = unwrap;
assert drillTable != null; assert drillTable != null;
} }


Expand Down

0 comments on commit 9073aed

Please sign in to comment.