Skip to content

Commit

Permalink
DRILL-2517: Move directory-based partition pruning to Calcite logical…
Browse files Browse the repository at this point in the history
… planning phase.

1) Make directory-based pruning rule both work in calcite logical and drill logical planning phase.

2) Only apply directory-based pruning in logical phase when there is no metadata cache.

3) Make FileSelection constructor public, since FileSelection.create() would modify selectionRoot.
  • Loading branch information
jinfengni committed Jan 30, 2016
1 parent c7dfba2 commit 9b4008d
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 338 deletions.
Expand Up @@ -19,6 +19,7 @@


import io.netty.buffer.DrillBuf; import io.netty.buffer.DrillBuf;


import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets; import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
Expand All @@ -27,6 +28,7 @@
import org.apache.drill.exec.planner.AbstractPartitionDescriptor; import org.apache.drill.exec.planner.AbstractPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation; import org.apache.drill.exec.planner.PartitionLocation;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.hive.HiveUtilities; import org.apache.drill.exec.store.hive.HiveUtilities;
Expand Down Expand Up @@ -89,27 +91,6 @@ public String getBaseTableLocation() {
return origEntry.table.getTable().getSd().getLocation(); return origEntry.table.getTable().getSd().getLocation();
} }


@Override
public GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
List<HiveTable.HivePartition> newPartitions = new LinkedList<>();

for (HiveTable.HivePartition part: oldPartitions) {
String partitionLocation = part.getPartition().getSd().getLocation();
for (String newPartitionLocation: newFiles) {
if (partitionLocation.equals(newPartitionLocation)) {
newPartitions.add(part);
}
}
}

HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);

return hiveScan.clone(newReadEntry);
}

@Override @Override
public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
Expand Down Expand Up @@ -169,4 +150,37 @@ protected void createPartitionSublists() {
sublistsCreated = true; sublistsCreated = true;
} }


@Override
public TableScan createTableScan(List<String> newPartitions) throws Exception {
GroupScan newGroupScan = createNewGroupScan(newPartitions);
return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
scanRel.getTable(),
newGroupScan,
scanRel.getRowType(),
scanRel.getColumns(),
true /*filter pushdown*/);
}

private GroupScan createNewGroupScan(List<String> newFiles) throws ExecutionSetupException {
HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
HiveReadEntry origReadEntry = hiveScan.hiveReadEntry;
List<HiveTable.HivePartition> oldPartitions = origReadEntry.partitions;
List<HiveTable.HivePartition> newPartitions = new LinkedList<>();

for (HiveTable.HivePartition part: oldPartitions) {
String partitionLocation = part.getPartition().getSd().getLocation();
for (String newPartitionLocation: newFiles) {
if (partitionLocation.equals(newPartitionLocation)) {
newPartitions.add(part);
}
}
}

HiveReadEntry newReadEntry = new HiveReadEntry(origReadEntry.table, newPartitions);

return hiveScan.clone(newReadEntry);
}


} }
Expand Up @@ -19,6 +19,7 @@
package org.apache.drill.exec.planner.sql.logical; package org.apache.drill.exec.planner.sql.logical;


import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.PartitionDescriptor;
Expand All @@ -44,7 +45,7 @@ public static final StoragePluginOptimizerRule getFilterOnProject(OptimizerRules
optimizerRulesContext) { optimizerRulesContext) {


@Override @Override
public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(), return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue); defaultPartitionValue);
} }
Expand All @@ -63,9 +64,9 @@ public boolean matches(RelOptRuleCall call) {


@Override @Override
public void onMatch(RelOptRuleCall call) { public void onMatch(RelOptRuleCall call) {
final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); final DrillFilterRel filterRel = call.rel(0);
final DrillProjectRel projectRel = (DrillProjectRel) call.rel(1); final DrillProjectRel projectRel = call.rel(1);
final DrillScanRel scanRel = (DrillScanRel) call.rel(2); final DrillScanRel scanRel = call.rel(2);
doOnMatch(call, filterRel, projectRel, scanRel); doOnMatch(call, filterRel, projectRel, scanRel);
} }
}; };
Expand All @@ -78,7 +79,7 @@ public static final StoragePluginOptimizerRule getFilterOnScan(OptimizerRulesCon
"HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) { "HivePushPartitionFilterIntoScan:Filter_On_Scan_Hive", optimizerRulesContext) {


@Override @Override
public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(), return new HivePartitionDescriptor(settings, (DrillScanRel) scanRel, getOptimizerRulesContext().getManagedBuffer(),
defaultPartitionValue); defaultPartitionValue);
} }
Expand All @@ -97,8 +98,8 @@ public boolean matches(RelOptRuleCall call) {


@Override @Override
public void onMatch(RelOptRuleCall call) { public void onMatch(RelOptRuleCall call) {
final DrillFilterRel filterRel = (DrillFilterRel) call.rel(0); final DrillFilterRel filterRel = call.rel(0);
final DrillScanRel scanRel = (DrillScanRel) call.rel(1); final DrillScanRel scanRel = call.rel(1);
doOnMatch(call, filterRel, null, scanRel); doOnMatch(call, filterRel, null, scanRel);
} }
}; };
Expand Down
Expand Up @@ -24,19 +24,25 @@
import java.util.Map; import java.util.Map;


import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;


import org.apache.calcite.adapter.enumerable.EnumerableTableScan; import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets; import org.apache.calcite.util.BitSets;
import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types; import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSelection;
Expand All @@ -53,14 +59,22 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
private final String partitionLabel; private final String partitionLabel;
private final int partitionLabelLength; private final int partitionLabelLength;
private final Map<String, Integer> partitions = Maps.newHashMap(); private final Map<String, Integer> partitions = Maps.newHashMap();
private final EnumerableTableScan scanRel; private final TableScan scanRel;
private final DynamicDrillTable table; private final DrillTable table;


public FileSystemPartitionDescriptor(PlannerSettings settings, RelNode scanRel) { public FileSystemPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
Preconditions.checkArgument(scanRel instanceof DrillScanRel || scanRel instanceof EnumerableTableScan);
this.partitionLabel = settings.getFsPartitionColumnLabel(); this.partitionLabel = settings.getFsPartitionColumnLabel();
this.partitionLabelLength = partitionLabel.length(); this.partitionLabelLength = partitionLabel.length();
this.scanRel = (EnumerableTableScan) scanRel; this.scanRel = scanRel;
table = scanRel.getTable().unwrap(DynamicDrillTable.class); DrillTable unwrap;
unwrap = scanRel.getTable().unwrap(DrillTable.class);
if (unwrap == null) {
unwrap = scanRel.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
}

table = unwrap;

for(int i =0; i < 10; i++){ for(int i =0; i < 10; i++){
partitions.put(partitionLabel + i, i); partitions.put(partitionLabel + i, i);
} }
Expand All @@ -87,18 +101,18 @@ public int getMaxHierarchyLevel() {
return MAX_NESTED_SUBDIRS; return MAX_NESTED_SUBDIRS;
} }


@Override // @Override
public GroupScan createNewGroupScan(List<String> newFiles) throws IOException { // public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
/* // if (scanRel instanceof DrillScanRel) {
THIS NEEDS TO CHANGE. WE SHOULD RETURN A ENUMERABLETABLESCAN?? // final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true); // final FileGroupScan newScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection); // return newScan;
return newScan; // } else {
*/ // throw new UnsupportedOperationException("Does not allow to get groupScan for EnumerableTableScan");
return null; // }
} // }


public DynamicDrillTable getTable() { public DrillTable getTable() {
return table; return table;
} }


Expand Down Expand Up @@ -152,4 +166,37 @@ protected void createPartitionSublists() {
sublistsCreated = true; sublistsCreated = true;
} }


@Override
public TableScan createTableScan(List<String> newFiles) throws Exception {
if (scanRel instanceof DrillScanRel) {
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
final FileGroupScan newGroupScan = ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
scanRel.getTable(),
newGroupScan,
scanRel.getRowType(),
((DrillScanRel) scanRel).getColumns(),
true /*filter pushdown*/);
} else if (scanRel instanceof EnumerableTableScan) {
return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles);
} else {
throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
}
}

private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles) {
final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
final FormatSelection formatSelection = (FormatSelection) table.getSelection();
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation());
final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
final DrillTranslatableTable newTable = new DrillTranslatableTable(
new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
table.getUserName(),
newFormatSelection));
final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable);

return EnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl);
}

} }
Expand Up @@ -17,11 +17,13 @@
*/ */
package org.apache.drill.exec.planner; package org.apache.drill.exec.planner;


import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets; import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.FileGroupScan; import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSelection;
Expand Down Expand Up @@ -78,8 +80,7 @@ public int getMaxHierarchyLevel() {
return partitionColumns.size(); return partitionColumns.size();
} }


@Override private GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation()); final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation());
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection); final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
return newScan; return newScan;
Expand Down Expand Up @@ -128,4 +129,16 @@ protected void createPartitionSublists() {
sublistsCreated = true; sublistsCreated = true;
} }


@Override
public TableScan createTableScan(List<String> newFiles) throws Exception {
final GroupScan newGroupScan = createNewGroupScan(newFiles);

return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
scanRel.getTable(),
newGroupScan,
scanRel.getRowType(),
scanRel.getColumns(),
true /*filter pushdown*/);
}
} }
Expand Up @@ -17,6 +17,7 @@
*/ */
package org.apache.drill.exec.planner; package org.apache.drill.exec.planner;


import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.GroupScan;
Expand Down Expand Up @@ -53,8 +54,6 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
// Maximum level of partition nesting/ hierarchy supported // Maximum level of partition nesting/ hierarchy supported
public int getMaxHierarchyLevel(); public int getMaxHierarchyLevel();


public GroupScan createNewGroupScan(List<String> newFiles) throws Exception;

/** /**
* Method creates an in memory representation of all the partitions. For each level of partitioning we * Method creates an in memory representation of all the partitions. For each level of partitioning we
* will create a value vector which this method will populate for all the partitions with the values of the * will create a value vector which this method will populate for all the partitions with the values of the
Expand All @@ -74,4 +73,13 @@ void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> par
* @return * @return
*/ */
TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings); TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings);

/**
* Methods create a new TableScan rel node, given the lists of new partitions or new files to SCAN.
* @param newPartitions
* @return
* @throws Exception
*/
public TableScan createTableScan(List<String> newPartitions) throws Exception;

} }
Expand Up @@ -54,8 +54,14 @@ public void onMatch(RelOptRuleCall call) {
try { try {
ProjectPushInfo columnInfo = PrelUtil.getColumns(scan.getRowType(), proj.getProjects()); ProjectPushInfo columnInfo = PrelUtil.getColumns(scan.getRowType(), proj.getProjects());


// get DrillTable, either wrapped in RelOptTable, or DrillTranslatableTable.
DrillTable table = scan.getTable().unwrap(DrillTable.class);
if (table == null) {
table = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
}

if (columnInfo == null || columnInfo.isStarQuery() // if (columnInfo == null || columnInfo.isStarQuery() //
|| !scan.getTable().unwrap(DrillTable.class) // || !table //
.getGroupScan().canPushdownProjects(columnInfo.columns)) { .getGroupScan().canPushdownProjects(columnInfo.columns)) {
return; return;
} }
Expand Down
Expand Up @@ -204,8 +204,8 @@ public static RuleSet getDrillBasicRules(OptimizerRulesContext optimizerRulesCon
public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) { public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder() final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
.add( .add(
PruneScanRule.getFilterOnProject(optimizerRulesContext), PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
PruneScanRule.getFilterOnScan(optimizerRulesContext), PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext), ParquetPruneScanRule.getFilterOnProjectParquet(optimizerRulesContext),
ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext) ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext)
) )
Expand All @@ -214,6 +214,23 @@ public static RuleSet getPruneScanRules(OptimizerRulesContext optimizerRulesCont
return new DrillRuleSet(pruneRules); return new DrillRuleSet(pruneRules);
} }


/**
* Get an immutable list of directory-based partition pruing rules that will be used in Calcite logical planning.
* @param optimizerRulesContext
* @return
*/
public static RuleSet getDirPruneScanRules(OptimizerRulesContext optimizerRulesContext) {
final ImmutableSet<RelOptRule> pruneRules = ImmutableSet.<RelOptRule>builder()
.add(
PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
PruneScanRule.getDirFilterOnScan(optimizerRulesContext)
)
.build();

return new DrillRuleSet(pruneRules);

}

// Ruleset for join permutation, used only in VolcanoPlanner. // Ruleset for join permutation, used only in VolcanoPlanner.
public static RuleSet getJoinPermRules(OptimizerRulesContext optimizerRulesContext) { public static RuleSet getJoinPermRules(OptimizerRulesContext optimizerRulesContext) {
return new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( // return new DrillRuleSet(ImmutableSet.<RelOptRule> builder().add( //
Expand Down

0 comments on commit 9b4008d

Please sign in to comment.