diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java index e1eb25e5e91..e531f3806ea 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -19,6 +19,7 @@ import io.netty.buffer.DrillBuf; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.util.BitSets; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -27,6 +28,7 @@ import org.apache.drill.exec.planner.AbstractPartitionDescriptor; import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.PartitionLocation; +import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.hive.HiveUtilities; @@ -89,27 +91,6 @@ public String getBaseTableLocation() { return origEntry.table.getTable().getSd().getLocation(); } - @Override - public GroupScan createNewGroupScan(List newFiles) throws ExecutionSetupException { - HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); - HiveReadEntry origReadEntry = hiveScan.hiveReadEntry; - List oldPartitions = origReadEntry.partitions; - List newPartitions = new LinkedList<>(); - - for (HiveTable.HivePartition part: oldPartitions) { - String partitionLocation = part.getPartition().getSd().getLocation(); - for (String newPartitionLocation: newFiles) { - if (partitionLocation.equals(newPartitionLocation)) { - newPartitions.add(part); - } - } - } - - HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions); - - return hiveScan.clone(newReadEntry); - } - @Override public void populatePartitionVectors(ValueVector[] vectors, List partitions, BitSet partitionColumnBitSet, Map fieldNameMap) { @@ -169,4 +150,37 @@ protected void createPartitionSublists() { sublistsCreated = true; } + @Override + public TableScan createTableScan(List newPartitions) throws Exception { + GroupScan newGroupScan = createNewGroupScan(newPartitions); + return new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scanRel.getTable(), + newGroupScan, + scanRel.getRowType(), + scanRel.getColumns(), + true /*filter pushdown*/); + } + + private GroupScan createNewGroupScan(List newFiles) throws ExecutionSetupException { + HiveScan hiveScan = (HiveScan) scanRel.getGroupScan(); + HiveReadEntry origReadEntry = hiveScan.hiveReadEntry; + List oldPartitions = origReadEntry.partitions; + List newPartitions = new LinkedList<>(); + + for (HiveTable.HivePartition part: oldPartitions) { + String partitionLocation = part.getPartition().getSd().getLocation(); + for (String newPartitionLocation: newFiles) { + if (partitionLocation.equals(newPartitionLocation)) { + newPartitions.add(part); + } + } + } + + HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions); + + return hiveScan.clone(newReadEntry); + } + + } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java index 17cd65af4dc..0bdfe9955d9 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/HivePushPartitionFilterIntoScan.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.planner.sql.logical; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.PartitionDescriptor; @@ -44,7 +45,7 @@ public static final StoragePluginOptimizerRule getFilterOnProject(OptimizerRules optimizerRulesContext) { @Override - public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(), defaultPartitionValue); } @@ -63,9 +64,9 @@ public boolean matches(RelOptRuleCall call) { @Override public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); - final DrillScanRel scanRel = (DrillScanRel) call.rel(2); + final DrillFilterRel filterRel = call.rel(0); + final DrillProjectRel projectRel = call.rel(1); + final DrillScanRel scanRel = call.rel(2); doOnMatch(call, filterRel, projectRel, scanRel); } }; @@ -78,7 +79,7 @@ public static final StoragePluginOptimizerRule getFilterOnScan(OptimizerRulesCon "HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) { @Override - public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(), defaultPartitionValue); } @@ -97,8 +98,8 @@ public boolean matches(RelOptRuleCall call) { @Override public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillScanRel scanRel = (DrillScanRel) call.rel(1); + final DrillFilterRel filterRel = call.rel(0); + final DrillScanRel scanRel = call.rel(1); doOnMatch(call, filterRel, null, scanRel); } }; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java index 02658ed033e..04a3f970091 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java @@ -24,11 +24,14 @@ import java.util.Map; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.calcite.adapter.enumerable.EnumerableTableScan; +import org.apache.calcite.prepare.RelOptTableImpl; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.util.BitSets; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.SchemaPath; @@ -36,7 +39,10 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DrillTranslatableTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.dfs.FileSelection; @@ -53,14 +59,22 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { private final String partitionLabel; private final int partitionLabelLength; private final Map partitions = Maps.newHashMap(); - private final EnumerableTableScan scanRel; - private final DynamicDrillTable table; + private final TableScan scanRel; + private final DrillTable table; - public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { + public FileSystemPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { + Preconditions.checkArgument(scanRel instanceof DrillScanRel || scanRel instanceof EnumerableTableScan); this.partitionLabel = settings.getFsPartitionColumnLabel(); this.partitionLabelLength = partitionLabel.length(); - this.scanRel = (EnumerableTableScan) scanRel; - table = scanRel.getTable().unwrap(DynamicDrillTable.class); + this.scanRel = scanRel; + DrillTable unwrap; + unwrap = scanRel.getTable().unwrap(DrillTable.class); + if (unwrap == null) { + unwrap = scanRel.getTable().unwrap(DrillTranslatableTable.class).getDrillTable(); + } + + table = unwrap; + for(int i =0; i < 10; i++){ partitions.put(partitionLabel + i, i); } @@ -87,18 +101,18 @@ public int getMaxHierarchyLevel() { return MAX_NESTED_SUBDIRS; } - @Override - public GroupScan createNewGroupScan(List newFiles) throws IOException { - /* - THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN?? - final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true); - final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection); - return newScan; - */ - return null; - } - - public DynamicDrillTable getTable() { +// @Override +// public GroupScan createNewGroupScan(List newFiles) throws IOException { +// if (scanRel instanceof DrillScanRel) { +// final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation()); +// final FileGroupScan newScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection); +// return newScan; +// } else { +// throw new UnsupportedOperationException("Does not allow to get groupScan for EnumerableTableScan"); +// } +// } + + public DrillTable getTable() { return table; } @@ -152,4 +166,37 @@ protected void createPartitionSublists() { sublistsCreated = true; } + @Override + public TableScan createTableScan(List newFiles) throws Exception { + if (scanRel instanceof DrillScanRel) { + final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation()); + final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection); + return new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scanRel.getTable(), + newGroupScan, + scanRel.getRowType(), + ((DrillScanRel) scanRel).getColumns(), + true /*filter pushdown*/); + } else if (scanRel instanceof EnumerableTableScan) { + return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles); + } else { + throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!"); + } + } + + private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List newFiles) { + final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable(); + final FormatSelection formatSelection = (FormatSelection) table.getSelection(); + final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation()); + final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection); + final DrillTranslatableTable newTable = new DrillTranslatableTable( + new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(), + table.getUserName(), + newFormatSelection)); + final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable); + + return EnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java index cda5a5eed74..81bcf03433a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java @@ -17,11 +17,13 @@ */ package org.apache.drill.exec.planner; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.util.BitSets; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.dfs.FileSelection; @@ -78,8 +80,7 @@ public int getMaxHierarchyLevel() { return partitionColumns.size(); } - @Override - public GroupScan createNewGroupScan(List newFiles) throws IOException { + private GroupScan createNewGroupScan(List newFiles) throws IOException { final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation()); final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection); return newScan; @@ -128,4 +129,16 @@ protected void createPartitionSublists() { sublistsCreated = true; } + @Override + public TableScan createTableScan(List newFiles) throws Exception { + final GroupScan newGroupScan = createNewGroupScan(newFiles); + + return new DrillScanRel(scanRel.getCluster(), + scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scanRel.getTable(), + newGroupScan, + scanRel.getRowType(), + scanRel.getColumns(), + true /*filter pushdown*/); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java index 726d8bc5096..dd3b084c5fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.planner; +import org.apache.calcite.rel.core.TableScan; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.physical.base.GroupScan; @@ -53,8 +54,6 @@ public interface PartitionDescriptor extends Iterable> { // Maximum level of partition nesting/ hierarchy supported public int getMaxHierarchyLevel(); - public GroupScan createNewGroupScan(List newFiles) throws Exception; - /** * Method creates an in memory representation of all the partitions. For each level of partitioning we * will create a value vector which this method will populate for all the partitions with the values of the @@ -74,4 +73,13 @@ void populatePartitionVectors(ValueVector[] vectors, List par * @return */ TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings); + + /** + * Methods create a new TableScan rel node, given the lists of new partitions or new files to SCAN. + * @param newPartitions + * @return + * @throws Exception + */ + public TableScan createTableScan(List newPartitions) throws Exception; + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java index 1fd1cd77f71..33c840b44ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java @@ -54,8 +54,14 @@ public void onMatch(RelOptRuleCall call) { try { ProjectPushInfo columnInfo = PrelUtil.getColumns(scan.getRowType(), proj.getProjects()); + // get DrillTable, either wrapped in RelOptTable, or DrillTranslatableTable. + DrillTable table = scan.getTable().unwrap(DrillTable.class); + if (table == null) { + table = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable(); + } + if (columnInfo == null || columnInfo.isStarQuery() // - || !scan.getTable().unwrap(DrillTable.class) // + || !table // .getGroupScan().canPushdownProjects(columnInfo.columns)) { return; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 6f1f995ad64..d9609d2bd69 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -204,8 +204,8 @@ public static RuleSet getDrillBasicRules(OptimizerRulesContext optimizerRulesCon public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) { final ImmutableSet pruneRules = ImmutableSet.builder() .add( - PruneScanRule.getFilterOnProject(optimizerRulesContext), - PruneScanRule.getFilterOnScan(optimizerRulesContext), + PruneScanRule.getDirFilterOnProject(optimizerRulesContext), + PruneScanRule.getDirFilterOnScan(optimizerRulesContext), ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext), ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext) ) @@ -214,6 +214,23 @@ public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesCont return new DrillRuleSet(pruneRules); } + /** + * Get an immutable list of directory-based partition pruing rules that will be used in Calcite logical planning. + * @param optimizerRulesContext + * @return + */ + public static RuleSet getDirPruneScanRules(OptimizerRulesContext optimizerRulesContext) { + final ImmutableSet pruneRules = ImmutableSet.builder() + .add( + PruneScanRule.getDirFilterOnProject(optimizerRulesContext), + PruneScanRule.getDirFilterOnScan(optimizerRulesContext) + ) + .build(); + + return new DrillRuleSet(pruneRules); + + } + // Ruleset for join permutation, used only in VolcanoPlanner. public static RuleSet getJoinPermRules(OptimizerRulesContext optimizerRulesContext) { return new DrillRuleSet(ImmutableSet. builder().add( // diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java index d92d2b0f3d4..f5dbb9d87d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java @@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.TableScan; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; @@ -42,13 +43,13 @@ public static final RelOptRule getFilterOnProjectParquet(OptimizerRulesContext o optimizerRulesContext) { @Override - public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel); } @Override public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(2); + final DrillScanRel scan = call.rel(2); GroupScan groupScan = scan.getGroupScan(); // this rule is applicable only for parquet based partition pruning if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) { @@ -60,9 +61,9 @@ public boolean matches(RelOptRuleCall call) { @Override public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); - final DrillScanRel scanRel = (DrillScanRel) call.rel(2); + final DrillFilterRel filterRel = call.rel(0); + final DrillProjectRel projectRel = call.rel(1); + final DrillScanRel scanRel = call.rel(2); doOnMatch(call, filterRel, projectRel, scanRel); } }; @@ -74,13 +75,13 @@ public static final RelOptRule getFilterOnScanParquet(OptimizerRulesContext opti "PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) { @Override - public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { return new ParquetPartitionDescriptor(settings, (DrillScanRel) scanRel); } @Override public boolean matches(RelOptRuleCall call) { - final DrillScanRel scan = (DrillScanRel) call.rel(1); + final DrillScanRel scan = call.rel(1); GroupScan groupScan = scan.getGroupScan(); // this rule is applicable only for parquet based partition pruning if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) { @@ -92,8 +93,8 @@ public boolean matches(RelOptRuleCall call) { @Override public void onMatch(RelOptRuleCall call) { - final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); - final DrillScanRel scanRel = (DrillScanRel) call.rel(1); + final DrillFilterRel filterRel = call.rel(0); + final DrillScanRel scanRel = call.rel(1); doOnMatch(call, filterRel, null, scanRel); } }; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index bfd65977bb7..aefd247d518 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -25,8 +25,12 @@ import com.google.common.base.Stopwatch; import org.apache.calcite.adapter.enumerable.EnumerableTableScan; +import org.apache.calcite.jdbc.CalciteAbstractSchema; +import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.prepare.RelOptTableImpl; +import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.util.BitSets; @@ -55,6 +59,8 @@ import org.apache.drill.exec.planner.logical.DrillProjectRel; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DrillTranslatableTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.PlannerSettings; @@ -64,6 +70,7 @@ import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.parquet.ParquetFileSelection; import org.apache.drill.exec.vector.NullableBitVector; import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptRule; @@ -87,282 +94,64 @@ public PruneScanRule(RelOptRuleOperand operand, String id, OptimizerRulesContext this.optimizerContext = optimizerContext; } - public static final RelOptRule getFilterOnProject(OptimizerRulesContext optimizerRulesContext) { - return new PruneScanRule( - RelOptHelper.some(LogicalFilter.class, RelOptHelper.some(Project.class, RelOptHelper.any(EnumerableTableScan.class))), - "PruneScanRule:Filter_On_Project", - optimizerRulesContext) { - - @Override - public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { - return new FileSystemPartitionDescriptor(settings, scanRel); - } - - @Override - public boolean matches(RelOptRuleCall call) { - /* - final DrillScanRel scan = (DrillScanRel) call.rel(2); - GroupScan groupScan = scan.getGroupScan(); - // this rule is applicable only for dfs based partition pruning - return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); - */ - return true; - } - - @Override - public void onMatch(RelOptRuleCall call) { - final LogicalFilter filterRel = call.rel(0); - final Project projectRel = call.rel(1); - final EnumerableTableScan scanRel = call.rel(2); - doOnMatchLogical(call, filterRel, projectRel, scanRel); - } - }; - } - - public static final RelOptRule getFilterOnScan(OptimizerRulesContext optimizerRulesContext) { - return new PruneScanRule( - RelOptHelper.some(LogicalFilter.class, RelOptHelper.any(EnumerableTableScan.class)), - "PruneScanRule:Filter_On_Scan", optimizerRulesContext) { - - @Override - public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { - return new FileSystemPartitionDescriptor(settings, scanRel); - } - - @Override - public boolean matches(RelOptRuleCall call) { - /* final DrillScanRel scan = (DrillScanRel) call.rel(1); - GroupScan groupScan = scan.getGroupScan(); - // this rule is applicable only for dfs based partition pruning - return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown(); - */ - return true; - } - - @Override - public void onMatch(RelOptRuleCall call) { - final LogicalFilter filterRel = call.rel(0); - final EnumerableTableScan scanRel = call.rel(1); - doOnMatchLogical(call, filterRel, null, scanRel); - } - }; - } - - // TODO: Combine the doOnMatch and doOnMatchLogical - protected void doOnMatchLogical(RelOptRuleCall call, LogicalFilter filterRel, Project projectRel, EnumerableTableScan scanRel) { - final String pruningClassName = getClass().getName(); - logger.info("Beginning partition pruning, pruning class: {}", pruningClassName); - Stopwatch totalPruningTime = new Stopwatch(); - totalPruningTime.start(); - - - final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); - PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel); - final BufferAllocator allocator = optimizerContext.getAllocator(); - - - RexNode condition = null; - if (projectRel == null) { - condition = filterRel.getCondition(); - } else { - // get the filter as if it were below the projection. - condition = RelOptUtil.pushFilterPastProject(filterRel.getCondition(), projectRel); + private static class DirPruneScanFilterOnProjectRule extends PruneScanRule { + public DirPruneScanFilterOnProjectRule(OptimizerRulesContext optimizerRulesContext) { + super(RelOptHelper.some(Filter.class, RelOptHelper.some(Project.class, RelOptHelper.any(TableScan.class))), "DirPruneScanRule:Filter_On_Project", optimizerRulesContext); } - RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder()); - condition = condition.accept(visitor); - - Map fieldNameMap = Maps.newHashMap(); - List fieldNames = scanRel.getRowType().getFieldNames(); - BitSet columnBitset = new BitSet(); - BitSet partitionColumnBitSet = new BitSet(); - - int relColIndex = 0; - for (String field : fieldNames) { - final Integer partitionIndex = descriptor.getIdIfValid(field); - if (partitionIndex != null) { - fieldNameMap.put(partitionIndex, field); - partitionColumnBitSet.set(partitionIndex); - columnBitset.set(relColIndex); - } - relColIndex++; + @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { + return new FileSystemPartitionDescriptor(settings, scanRel); } - if (partitionColumnBitSet.isEmpty()) { - logger.info("No partition columns are projected from the scan..continue. " + - "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); - return; + @Override + public boolean matches(RelOptRuleCall call) { + final TableScan scan = call.rel(2); + return isQualifiedDirPruning(scan); } - // stop watch to track how long we spend in different phases of pruning - Stopwatch miscTimer = new Stopwatch(); - - // track how long we spend building the filter tree - miscTimer.start(); - - FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder()); - c.analyze(condition); - RexNode pruneCondition = c.getFinalCondition(); - - logger.info("Total elapsed time to build and analyze filter tree: {} ms", - miscTimer.elapsed(TimeUnit.MILLISECONDS)); - miscTimer.reset(); - - if (pruneCondition == null) { - logger.info("No conditions were found eligible for partition pruning." + - "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); - return; + @Override + public void onMatch(RelOptRuleCall call) { + final Filter filterRel = call.rel(0); + final Project projectRel = call.rel(1); + final TableScan scanRel = call.rel(2); + doOnMatch(call, filterRel, projectRel, scanRel); } + } - // set up the partitions - List newFiles = Lists.newArrayList(); - long numTotal = 0; // total number of partitions - int batchIndex = 0; - String firstLocation = null; - LogicalExpression materializedExpr = null; - - // Outer loop: iterate over a list of batches of PartitionLocations - for (List partitions : descriptor) { - numTotal += partitions.size(); - logger.debug("Evaluating partition pruning for batch {}", batchIndex); - if (batchIndex == 0) { // save the first location in case everything is pruned - firstLocation = partitions.get(0).getEntirePartitionLocation(); - } - final NullableBitVector output = new NullableBitVector(MaterializedField.create("", Types.optional(MinorType.BIT)), allocator); - final VectorContainer container = new VectorContainer(); - - try { - final ValueVector[] vectors = new ValueVector[descriptor.getMaxHierarchyLevel()]; - for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { - SchemaPath column = SchemaPath.getSimplePath(fieldNameMap.get(partitionColumnIndex)); - MajorType type = descriptor.getVectorType(column, settings); - MaterializedField field = MaterializedField.create(column, type); - ValueVector v = TypeHelper.getNewVector(field, allocator); - v.allocateNew(); - vectors[partitionColumnIndex] = v; - container.add(v); - } - - // track how long we spend populating partition column vectors - miscTimer.start(); - - // populate partition vectors. - descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap); - - logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}", - miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex); - miscTimer.reset(); - - // materialize the expression; only need to do this once - if (batchIndex == 0) { - materializedExpr = materializePruneExpr(pruneCondition, settings, scanRel, container); - if (materializedExpr == null) { - // continue without partition pruning; no need to log anything here since - // materializePruneExpr logs it already - logger.info("Total pruning elapsed time: {} ms", - totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); - return; - } - } - - output.allocateNew(partitions.size()); - - // start the timer to evaluate how long we spend in the interpreter evaluation - miscTimer.start(); - - InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr); - - logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {}", - miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex); - miscTimer.reset(); - - int recordCount = 0; - int qualifiedCount = 0; - - // Inner loop: within each batch iterate over the PartitionLocations - for(PartitionLocation part: partitions){ - if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){ - newFiles.add(part.getEntirePartitionLocation()); - qualifiedCount++; - } - recordCount++; - } - logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount); - batchIndex++; - } catch (Exception e) { - logger.warn("Exception while trying to prune partition.", e); - logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); - return; // continue without partition pruning - } finally { - container.clear(); - if (output != null) { - output.clear(); - } - } + private static class DirPruneScanFilterOnScanRule extends PruneScanRule { + public DirPruneScanFilterOnScanRule(OptimizerRulesContext optimizerRulesContext) { + super(RelOptHelper.some(Filter.class, RelOptHelper.any(TableScan.class)), "DirPruneScanRule:Filter_On_Scan", optimizerRulesContext); } - try { - - boolean canDropFilter = true; - - if (newFiles.isEmpty()) { - assert firstLocation != null; - newFiles.add(firstLocation); - canDropFilter = false; - } - - if (newFiles.size() == numTotal) { - logger.info("No partitions were eligible for pruning"); - return; - } - - logger.info("Pruned {} partitions down to {}", numTotal, newFiles.size()); - - List conjuncts = RelOptUtil.conjunctions(condition); - List pruneConjuncts = RelOptUtil.conjunctions(pruneCondition); - conjuncts.removeAll(pruneConjuncts); - RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false); - - RewriteCombineBinaryOperators reverseVisitor = new RewriteCombineBinaryOperators(true, filterRel.getCluster().getRexBuilder()); - - condition = condition.accept(reverseVisitor); - pruneCondition = pruneCondition.accept(reverseVisitor); - - RelOptTableImpl t = (RelOptTableImpl) scanRel.getTable(); - DynamicDrillTable oldTable = ((FileSystemPartitionDescriptor) descriptor).getTable(); - FormatSelection formatSelection = (FormatSelection) oldTable.getSelection(); - FileSelection oldFileSelection = formatSelection.getSelection(); - FileSelection newFileSelection = new FileSelection(newFiles, oldFileSelection.selectionRoot, oldFileSelection.getParquetMetadata(), oldFileSelection.getFileStatuses()); - FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection); - DynamicDrillTable newTable = new DynamicDrillTable(oldTable.getPlugin(), oldTable.getStorageEngineName(), - oldTable.getUserName(), newFormatSelection); - RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable); - - // TODO: The new scan should come from the PartitionDescriptor - // TODO: Update the PartitionDescriptor to return ScanRel instead of the GroupScan - EnumerableTableScan newScan = EnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl); + @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { + return new FileSystemPartitionDescriptor(settings, scanRel); + } - RelNode inputRel = newScan; + @Override + public boolean matches(RelOptRuleCall call) { + final TableScan scan = call.rel(1); + return isQualifiedDirPruning(scan); + } - if (projectRel != null) { - inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel)); - } + @Override + public void onMatch(RelOptRuleCall call) { + final Filter filterRel = call.rel(0); + final TableScan scanRel = call.rel(1); + doOnMatch(call, filterRel, null, scanRel); + } + } - if (newCondition.isAlwaysTrue() && canDropFilter) { - call.transformTo(inputRel); - } else { - final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel)); - call.transformTo(newFilter); - } + public static final RelOptRule getDirFilterOnProject(OptimizerRulesContext optimizerRulesContext) { + return new DirPruneScanFilterOnProjectRule(optimizerRulesContext); + } - } catch (Exception e) { - logger.warn("Exception while using the pruned partitions.", e); - } finally { - logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); - } + public static final RelOptRule getDirFilterOnScan(OptimizerRulesContext optimizerRulesContext) { + return new DirPruneScanFilterOnScanRule(optimizerRulesContext); } - protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillProjectRel projectRel, DrillScanRel scanRel) { + protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) { final String pruningClassName = getClass().getName(); logger.info("Beginning partition pruning, pruning class: {}", pruningClassName); Stopwatch totalPruningTime = new Stopwatch(); @@ -373,7 +162,6 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel); final BufferAllocator allocator = optimizerContext.getAllocator(); - RexNode condition = null; if (projectRel == null) { condition = filterRel.getCondition(); @@ -541,16 +329,7 @@ protected void doOnMatch(RelOptRuleCall call, DrillFilterRel filterRel, DrillPro condition = condition.accept(reverseVisitor); pruneCondition = pruneCondition.accept(reverseVisitor); - final DrillScanRel newScanRel = - new DrillScanRel(scanRel.getCluster(), - scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scanRel.getTable(), - descriptor.createNewGroupScan(newFiles), - scanRel.getRowType(), - scanRel.getColumns(), - true /*filter pushdown*/); - - RelNode inputRel = newScanRel; + RelNode inputRel = descriptor.createTableScan(newFiles); if (projectRel != null) { inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel)); @@ -602,5 +381,28 @@ protected OptimizerRulesContext getOptimizerRulesContext() { return optimizerContext; } - public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel); + public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel); + + private static boolean isQualifiedDirPruning(final TableScan scan) { + if (scan instanceof EnumerableTableScan) { + DrillTable drillTable; + drillTable = scan.getTable().unwrap(DrillTable.class); + if (drillTable == null) { + drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable(); + } + final Object selection = drillTable.getSelection(); + if (selection instanceof FormatSelection + && ((FormatSelection)selection).supportDirPruning()) { + return true; // Do directory-based pruning in Calcite logical + } else { + return false; // Do not do directory-based pruning in Calcite logical + } + } else if (scan instanceof DrillScanRel) { + final GroupScan groupScan = ((DrillScanRel) scan).getGroupScan(); + // this rule is applicable only for dfs based partition pruning in Drill Logical + return groupScan instanceof FileGroupScan && groupScan.supportsPartitionFilterPushdown() && !((DrillScanRel)scan).partitionFilterPushdown(); + } + return false; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 686f7d7817f..d6bdc78c1c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -534,6 +534,15 @@ protected DrillRel addRenamedProject(DrillRel rel, RelDataType validatedRowType) private RelNode doLogicalPlanning(RelNode relNode) throws RelConversionException, SqlUnsupportedException { + // 1. Call HepPlanner with directory-based partition pruning, in Calcite logical rel + // Partition pruning . + ImmutableSet dirPruneScanRules = ImmutableSet.builder() + .addAll(DrillRuleSets.getDirPruneScanRules(context)) + .build(); + + relNode = doHepPlan(relNode, dirPruneScanRules, HepMatchOrder.BOTTOM_UP); + log("Post-Dir-Pruning", relNode, logger); + if (! context.getPlannerSettings().isHepOptEnabled()) { return planner.transform(DrillSqlWorker.LOGICAL_RULES, relNode.getTraitSet().plus(DrillRel.DRILL_LOGICAL), relNode); } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 0d49b5c0b22..bc3cef3387f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -52,7 +52,7 @@ public class FileSelection { * @param files list of files * @param selectionRoot root path for selections */ - protected FileSelection(final List statuses, final List files, final String selectionRoot) { + public FileSelection(final List statuses, final List files, final String selectionRoot) { this.statuses = statuses; this.files = files; this.selectionRoot = Preconditions.checkNotNull(selectionRoot); @@ -246,4 +246,8 @@ public List getFileStatuses() { return statuses; } + public boolean supportDirPrunig() { + return true; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java index 4473c5c4670..f802fb47d87 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java @@ -61,4 +61,8 @@ public FileSelection getSelection(){ return selection; } + @JsonIgnore + public boolean supportDirPruning() { + return selection.supportDirPrunig(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java index 33dccd6896e..93201bb91b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFileSelection.java @@ -59,4 +59,8 @@ public static ParquetFileSelection create(final FileSelection selection, final P return new ParquetFileSelection(selection, metadata); } + @Override + public boolean supportDirPrunig() { + return false; + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 0ca6bb9281a..97df2ee9082 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -1195,8 +1195,4 @@ public void testDateImplicitCasting() throws Exception { .run(); } - @Test - public void t() throws Exception { - test("explain plan for select a from dfs.tmp.foo where dir0 = 1"); - } } \ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java index ba7078837ff..e5d66031995 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java @@ -345,4 +345,20 @@ public void testPartitionFilter_Json_WithFlatten() throws Exception { testIncludeFilter(query, 1, "Filter", 1); } + @Test + public void testLogicalDirPruning() throws Exception { + // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet. + // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan. + String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995", TEST_RES_PATH); + testExcludeFilter(query, 1, "Filter", 10); + } + + @Test + public void testLogicalDirPruning2() throws Exception { + // 1995/Q1 contains one valid parquet, while 1996/Q1 contains bad format parquet. + // If dir pruning happens in logical, the query will run fine, since the bad parquet has been pruned before we build ParquetGroupScan. + String query = String.format("select dir0, o_custkey from dfs_test.`%s/multilevel/parquetWithBadFormat` where dir0=1995 and o_custkey > 0", TEST_RES_PATH); + testIncludeFilter(query, 1, "Filter", 10); + } + } \ No newline at end of file diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet new file mode 100644 index 00000000000..93514c4dae1 Binary files /dev/null and b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1995/Q1/orders_95_q1.parquet differ diff --git a/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet new file mode 100644 index 00000000000..f62d37d1cf1 --- /dev/null +++ b/exec/java-exec/src/test/resources/multilevel/parquetWithBadFormat/1996/Q1/badFormat.parquet @@ -0,0 +1 @@ +BAD FORMAT!!!