Skip to content

Commit

Permalink
Keep track of whether all partitions were previously pruned and proce…
Browse files Browse the repository at this point in the history
…ss this state where needed.
  • Loading branch information
Aman Sinha committed Jul 15, 2016
1 parent 7ee0dbb commit ce94e7f
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ protected void createPartitionSublists() {
}

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitions) throws Exception {
public TableScan createTableScan(List<PartitionLocation> newPartitions, boolean wasAllPartitionsPruned /* ignored */) throws Exception {
GroupScan newGroupScan = createNewGroupScan(newPartitions);
return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public boolean supportsSinglePartOptimization() {


@Override
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot) throws Exception {
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
boolean isAllPruned) throws Exception {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,16 @@ public boolean isCompositePartition() {
return true;
}

@Override
public String getCompositePartitionPath() {
String path = "";
for (int i=0; i < dirs.length; i++) {
if (dirs[i] == null) { // get the prefix
break;
}
path += "/" + dirs[i];
}
return path;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ protected Pair<Collection<String>, Boolean> getFileLocationsAndStatus() {
}

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot) throws Exception {
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws Exception {
List<String> newFiles = Lists.newArrayList();
for (final PartitionLocation location : newPartitionLocation) {
if (!location.isCompositePartition()) {
Expand All @@ -225,7 +226,7 @@ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, S
if (scanRel instanceof DrillScanRel) {
final FormatSelection formatSelection = (FormatSelection)table.getSelection();
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
cacheFileRoot, formatSelection.getSelection().getDirStatus());
cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
final FileGroupScan newGroupScan =
((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
return new DrillScanRel(scanRel.getCluster(),
Expand All @@ -236,17 +237,19 @@ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, S
((DrillScanRel) scanRel).getColumns(),
true /*filter pushdown*/);
} else if (scanRel instanceof EnumerableTableScan) {
return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot);
return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
wasAllPartitionsPruned);
} else {
throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
}
}

private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot) {
private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
boolean wasAllPartitionsPruned) {
final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
final FormatSelection formatSelection = (FormatSelection) table.getSelection();
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
cacheFileRoot, formatSelection.getSelection().getDirStatus());
cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
final DrillTranslatableTable newTable = new DrillTranslatableTable(
new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
Expand All @@ -258,8 +261,9 @@ private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, L
}

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation) throws Exception {
return createTableScan(newPartitionLocation, null);
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {
return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ public int getMaxHierarchyLevel() {
return partitionColumns.size();
}

private GroupScan createNewGroupScan(List<String> newFiles, String cacheFileRoot) throws IOException {
final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(), cacheFileRoot);
private GroupScan createNewGroupScan(List<String> newFiles, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws IOException {
final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(),
cacheFileRoot, wasAllPartitionsPruned);
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
return newScan;
}
Expand Down Expand Up @@ -131,13 +133,14 @@ protected void createPartitionSublists() {
}

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot) throws Exception {
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws Exception {
List<String> newFiles = Lists.newArrayList();
for (final PartitionLocation location : newPartitionLocation) {
newFiles.add(location.getEntirePartitionLocation());
}

final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot);
final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned);

return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
Expand All @@ -149,8 +152,9 @@ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, S
}

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation) throws Exception {
return createTableScan(newPartitionLocation, null);
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {
return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,24 @@ void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> par
/**
* Methods create a new TableScan rel node, given the lists of new partitions or new files to SCAN.
* @param newPartitions
* @param wasAllPartitionsPruned
* @return
* @throws Exception
*/
public TableScan createTableScan(List<PartitionLocation> newPartitions) throws Exception;
public TableScan createTableScan(List<PartitionLocation> newPartitions,
boolean wasAllPartitionsPruned) throws Exception;

/**
* Create a new TableScan rel node, given the lists of new partitions or new files to scan and a path
* to a metadata cache file
* @param newPartitions
* @param cacheFileRoot
* @param wasAllPartitionsPruned
* @return
* @throws Exception
*/
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot) throws Exception;
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws Exception;

public boolean supportsSinglePartOptimization();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,9 @@ public interface PartitionLocation {
*/
public boolean isCompositePartition();

/**
* Returns the path string of directory names only for composite partition
*/
public String getCompositePartitionPath();

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public boolean isCompositePartition() {
return false;
}

@Override
public String getCompositePartitionPath() {
throw new UnsupportedOperationException();
}

@Override
public List<SimplePartitionLocation> getPartitionLocationRecursive() {
return ImmutableList.of(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
import org.apache.drill.exec.planner.SimplePartitionLocation;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillScanRel;
Expand Down Expand Up @@ -79,7 +80,6 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);

final OptimizerRulesContext optimizerContext;
boolean wasAllPartitionsPruned = false; // whether all partitions were previously eliminated

public PruneScanRule(RelOptRuleOperand operand, String id, OptimizerRulesContext optimizerContext) {
super(operand, id);
Expand Down Expand Up @@ -144,10 +144,6 @@ public static final RelOptRule getDirFilterOnScan(OptimizerRulesContext optimize
}

protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
if (wasAllPartitionsPruned) {
// if previously we had already pruned out all the partitions, we should exit early
return;
}

final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Expand Down Expand Up @@ -359,15 +355,25 @@ protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectR

// handle the case all partitions are filtered out.
boolean canDropFilter = true;
boolean wasAllPartitionsPruned = false;
String cacheFileRoot = null;

if (newPartitions.isEmpty()) {
assert firstLocation != null;
// Add the first non-composite partition location, since execution requires schema.
// In such case, we should not drop filter.
newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0));
canDropFilter = false;
// NOTE: with DRILL-4530, the PruneScanRule may be called with only a list of
// directories first and the non-composite partition location will still return
// directories, not files. So, additional processing is done depending on this flag
wasAllPartitionsPruned = true;
logger.info("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal);

// set the cacheFileRoot appropriately
if (firstLocation.isCompositePartition()) {
cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath();
}
}

logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size());
Expand All @@ -382,8 +388,7 @@ protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectR
condition = condition.accept(reverseVisitor);
pruneCondition = pruneCondition.accept(reverseVisitor);

String cacheFileRoot = null;
if (checkForSingle && isSinglePartition) {
if (checkForSingle && isSinglePartition && !wasAllPartitionsPruned) {
// if metadata cache file could potentially be used, then assign a proper cacheFileRoot
String path = "";
for (int j = 0; j <= maxIndex; j++) {
Expand All @@ -393,7 +398,8 @@ protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectR
}

RelNode inputRel = descriptor.supportsSinglePartOptimization() ?
descriptor.createTableScan(newPartitions, cacheFileRoot) : descriptor.createTableScan(newPartitions);
descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);

if (projectRel != null) {
inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ private enum StatusType {
}

private StatusType dirStatus;
private boolean hadWildcard = false; // whether this selection previously had a wildcard
// whether this selection previously had a wildcard
private boolean hadWildcard = false;
// whether all partitions were previously pruned for this selection
private boolean wasAllPartitionsPruned = false;

/**
* Creates a {@link FileSelection selection} out of given file statuses/files and selection root.
Expand All @@ -75,20 +78,22 @@ private enum StatusType {
* @param selectionRoot root path for selections
*/
public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
this(statuses, files, selectionRoot, null, StatusType.NOT_CHECKED);
this(statuses, files, selectionRoot, null, false, StatusType.NOT_CHECKED);
}

public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot, final String cacheFileRoot) {
this(statuses, files, selectionRoot, cacheFileRoot, StatusType.NOT_CHECKED);
public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
this(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned, StatusType.NOT_CHECKED);
}

public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
final String cacheFileRoot, final StatusType dirStatus) {
final String cacheFileRoot, final boolean wasAllPartitionsPruned, final StatusType dirStatus) {
this.statuses = statuses;
this.files = files;
this.selectionRoot = Preconditions.checkNotNull(selectionRoot);
this.dirStatus = dirStatus;
this.cacheFileRoot = cacheFileRoot;
this.wasAllPartitionsPruned = wasAllPartitionsPruned;
}

/**
Expand All @@ -101,6 +106,8 @@ protected FileSelection(final FileSelection selection) {
this.selectionRoot = selection.selectionRoot;
this.dirStatus = selection.dirStatus;
this.cacheFileRoot = selection.cacheFileRoot;
this.hadWildcard = selection.hadWildcard;
this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
}

public String getSelectionRoot() {
Expand Down Expand Up @@ -202,6 +209,10 @@ public StatusType getDirStatus() {
return dirStatus;
}

public boolean wasAllPartitionsPruned() {
return this.wasAllPartitionsPruned;
}

private static String commonPath(final List<FileStatus> statuses) {
if (statuses == null || statuses.isEmpty()) {
return "";
Expand Down Expand Up @@ -288,7 +299,7 @@ public static FileSelection create(final DrillFileSystem fs, final String parent
* @see FileSelection#FileSelection(List, List, String)
*/
public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root,
final String cacheFileRoot) {
final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
final boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0);
final boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0);

Expand All @@ -308,11 +319,11 @@ public static FileSelection create(final List<FileStatus> statuses, final List<S
final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
selectionRoot = path.toString();
}
return new FileSelection(statuses, files, selectionRoot, cacheFileRoot);
return new FileSelection(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned);
}

public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root) {
return FileSelection.create(statuses, files, root, null);
return FileSelection.create(statuses, files, root, null, false);
}

public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection) {
Expand Down Expand Up @@ -368,7 +379,12 @@ public List<FileStatus> getFileStatuses() {
}

public boolean supportDirPrunig() {
return isExpandedFully() || isExpandedPartial();
if (isExpandedFully() || isExpandedPartial()) {
if (!wasAllPartitionsPruned) {
return true;
}
}
return false;
}

public void setHadWildcard(boolean wc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,13 @@ public long getRowCount() {

} else if (selection.isExpandedPartial() && cacheFileRoot != null) {
this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
for (Metadata.ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) {
fileSet.add(file.getPath());
if (selection.wasAllPartitionsPruned()) {
// if all partitions were previously pruned, we only need to read 1 file (for the schema)
fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
} else {
for (Metadata.ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) {
fileSet.add(file.getPath());
}
}
} else {
// we need to expand the files from fileStatuses
Expand All @@ -597,7 +602,6 @@ public long getRowCount() {
final Path metaPath = new Path(status.getPath(), Metadata.METADATA_FILENAME);
final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath.toString());
for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
// fileNames.add(file.getPath());
fileSet.add(file.getPath());
}
} else {
Expand Down Expand Up @@ -625,7 +629,8 @@ public long getRowCount() {
// because create() changes the root to include the scheme and authority; In future, if create()
// is the preferred way to instantiate a file selection, we may need to do something different...
// WARNING: file statuses and file names are inconsistent
FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString(), cacheFileRoot);
FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString(),
cacheFileRoot, selection.wasAllPartitionsPruned());

newSelection.setExpandedFully();
return newSelection;
Expand Down Expand Up @@ -913,7 +918,7 @@ public GroupScan applyLimit(long maxRecords) {
}

try {
FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot);
FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot, false);
logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size());
return this.clone(newSelection);
} catch (IOException e) {
Expand Down

0 comments on commit ce94e7f

Please sign in to comment.