Skip to content

Commit

Permalink
DRILL-4846: Fix a few performance issues for metadata access:
Browse files Browse the repository at this point in the history
 - Create a MetadataContext that can be shared among multiple invocations of the Metadata APIs.
 - Check directory modification time only if not previously checked.
 - Remove a redundant call for metadata read.
 - Added more logging.
 - Consolidate couple of metadata methods.

close #569
  • Loading branch information
Aman Sinha committed Aug 17, 2016
1 parent 0a4c21c commit 57dc9f4
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 84 deletions.
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.store.dfs.MetadataContext;

/**
* Abstract base class for file system based partition descriptors and Hive partition descriptors.
Expand Down Expand Up @@ -65,7 +66,7 @@ public boolean supportsMetadataCachePruning() {

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
boolean isAllPruned) throws Exception {
boolean isAllPruned, MetadataContext metaContext) throws Exception {
throw new UnsupportedOperationException();
}

Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;

Expand Down Expand Up @@ -211,7 +212,7 @@ protected Pair<Collection<String>, Boolean> getFileLocationsAndStatus() {

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws Exception {
boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
List<String> newFiles = Lists.newArrayList();
for (final PartitionLocation location : newPartitionLocation) {
if (!location.isCompositePartition()) {
Expand All @@ -228,6 +229,7 @@ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, S
final FormatSelection formatSelection = (FormatSelection)table.getSelection();
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
newFileSelection.setMetaContext(metaContext);
final FileGroupScan newGroupScan =
((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
return new DrillScanRel(scanRel.getCluster(),
Expand All @@ -239,18 +241,19 @@ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, S
true /*filter pushdown*/);
} else if (scanRel instanceof EnumerableTableScan) {
return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
wasAllPartitionsPruned);
wasAllPartitionsPruned, metaContext);
} else {
throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
}
}

private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
boolean wasAllPartitionsPruned) {
boolean wasAllPartitionsPruned, MetadataContext metaContext) {
final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
final FormatSelection formatSelection = (FormatSelection) table.getSelection();
final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
newFileSelection.setMetaContext(metaContext);
final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
final DrillTranslatableTable newTable = new DrillTranslatableTable(
new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
Expand All @@ -265,7 +268,7 @@ private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, L
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {
return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, null);
}

@Override
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.exec.vector.ValueVector;

Expand Down Expand Up @@ -81,9 +82,10 @@ public int getMaxHierarchyLevel() {
}

private GroupScan createNewGroupScan(List<String> newFiles, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws IOException {
boolean wasAllPartitionsPruned, MetadataContext metaContext) throws IOException {
final FileSelection newSelection = FileSelection.create(null, newFiles, getBaseTableLocation(),
cacheFileRoot, wasAllPartitionsPruned);
newSelection.setMetaContext(metaContext);
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newSelection);
return newScan;
}
Expand Down Expand Up @@ -134,13 +136,13 @@ protected void createPartitionSublists() {

@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws Exception {
boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
List<String> newFiles = Lists.newArrayList();
for (final PartitionLocation location : newPartitionLocation) {
newFiles.add(location.getEntirePartitionLocation());
}

final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned);
final GroupScan newGroupScan = createNewGroupScan(newFiles, cacheFileRoot, wasAllPartitionsPruned, metaContext);

return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
Expand All @@ -154,7 +156,7 @@ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, S
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {
return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned);
return createTableScan(newPartitionLocation, null, wasAllPartitionsPruned, null);
}

}
Expand Up @@ -22,6 +22,7 @@
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.ValueVector;

import java.util.BitSet;
Expand Down Expand Up @@ -90,11 +91,12 @@ public TableScan createTableScan(List<PartitionLocation> newPartitions,
* @param newPartitions
* @param cacheFileRoot
* @param wasAllPartitionsPruned
* @param metaContext
* @return
* @throws Exception
*/
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws Exception;
boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception;

public boolean supportsMetadataCachePruning();

Expand Down
Expand Up @@ -60,6 +60,7 @@
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
Expand Down Expand Up @@ -406,8 +407,13 @@ protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectR

}

final Object selection = getDrillTable(scanRel).getSelection();
MetadataContext metaContext = null;
if (selection instanceof FormatSelection) {
metaContext = ((FormatSelection)selection).getSelection().getMetaContext();
}
RelNode inputRel = descriptor.supportsMetadataCachePruning() ?
descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned, metaContext) :
descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);

if (projectRel != null) {
Expand Down Expand Up @@ -488,14 +494,18 @@ protected OptimizerRulesContext getOptimizerRulesContext() {

public abstract PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel);

private static DrillTable getDrillTable(final TableScan scan) {
DrillTable drillTable;
drillTable = scan.getTable().unwrap(DrillTable.class);
if (drillTable == null) {
drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
}
return drillTable;
}

private static boolean isQualifiedDirPruning(final TableScan scan) {
if (scan instanceof EnumerableTableScan) {
DrillTable drillTable;
drillTable = scan.getTable().unwrap(DrillTable.class);
if (drillTable == null) {
drillTable = scan.getTable().unwrap(DrillTranslatableTable.class).getDrillTable();
}
final Object selection = drillTable.getSelection();
final Object selection = getDrillTable(scan).getSelection();
if (selection instanceof FormatSelection
&& ((FormatSelection)selection).supportDirPruning()) {
return true; // Do directory-based pruning in Calcite logical
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
Expand All @@ -30,7 +31,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import com.google.common.collect.Maps;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -56,6 +57,11 @@ public class FileSelection {
*/
public final String cacheFileRoot;

/**
* metadata context useful for metadata operations (if any)
*/
private MetadataContext metaContext = null;

private enum StatusType {
NOT_CHECKED, // initial state
NO_DIRS, // no directories in this selection
Expand Down Expand Up @@ -106,6 +112,7 @@ protected FileSelection(final FileSelection selection) {
this.selectionRoot = selection.selectionRoot;
this.dirStatus = selection.dirStatus;
this.cacheFileRoot = selection.cacheFileRoot;
this.metaContext = selection.metaContext;
this.hadWildcard = selection.hadWildcard;
this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
}
Expand All @@ -124,7 +131,7 @@ public List<FileStatus> getStatuses(final DrillFileSystem fs) throws IOException
}
statuses = newStatuses;
}
logger.debug("FileSelection.getStatuses() took {} ms, numFiles: {}",
logger.info("FileSelection.getStatuses() took {} ms, numFiles: {}",
timer.elapsed(TimeUnit.MILLISECONDS), statuses == null ? 0 : statuses.size());

return statuses;
Expand Down Expand Up @@ -328,6 +335,7 @@ public static FileSelection create(final List<FileStatus> statuses, final List<S

public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection,
final String cacheFileRoot) {
Stopwatch timer = Stopwatch.createStarted();
final String root = selection.getSelectionRoot();
if (Strings.isNullOrEmpty(root)) {
throw new DrillRuntimeException("Selection root is null or empty" + root);
Expand All @@ -354,6 +362,7 @@ public static FileSelection createFromDirectories(final List<String> dirPaths, f
final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false);
fileSel.setHadWildcard(selection.hadWildcard());
logger.info("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
return fileSel;
}

Expand Down Expand Up @@ -402,6 +411,14 @@ public String getCacheFileRoot() {
return cacheFileRoot;
}

public void setMetaContext(MetadataContext context) {
metaContext = context;
}

public MetadataContext getMetaContext() {
return metaContext;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
Expand Down
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.dfs;

import java.util.Map;

import com.google.common.collect.Maps;

/**
* A metadata context that holds state across multiple invocations of
* the Parquet metadata APIs.
*/
public class MetadataContext {

/** Map of directory path to the status of whether modification time was already checked.
* Note: the #directories is typically a small percentage of the #files, so the memory footprint
* is expected to be relatively small.
*/
private Map<String, Boolean> dirModifCheckMap = Maps.newHashMap();

public MetadataContext() {
}

public void setStatus(String dir) {
dirModifCheckMap.put(dir, true);
}

public void clearStatus(String dir) {
dirModifCheckMap.put(dir, false);
}

public boolean getStatus(String dir) {
if (dirModifCheckMap.containsKey(dir)) {
return dirModifCheckMap.get(dir);
}
return false;
}

public void clear() {
dirModifCheckMap.clear();
}

}


0 comments on commit 57dc9f4

Please sign in to comment.