Skip to content

Commit

Permalink
IMPALA-4840: Fix REFRESH performance regression.
Browse files Browse the repository at this point in the history
The fix for IMPALA-4172 introduced a regression in
performance of the REFRESH command. The regression
stems from the fact that we reload the block metadata
of every valid data file without considering whether it
has changed since the last load. This caused unnecessary
metadata loads for unchanged files and thus increasing
the runtime.

The fix involves having the refresh codepath (and other
operations that use the same codepath like insert etc.) to
reload the metadata of only modified files by doing a
listStatus() on the partition directory and checking the
last modified time of each file. Without this patch, we relied
on listFiles(), which fetched the block locations irrespective of
whether the file has changed and it was significantly slower on
unchanged tables. The initial/invalidate metadata load still
fetches the block locations in bulk using listFiles(). The
side effect of this change is that the refresh no longer picks up
block location changes after HDFS block rebalancing. We suggest
using "invalidate metadata" for that which loads the metadata from
scratch.

Additionally, this commit enables the reuse of metadata during
table refresh (which was disabled in IMPALA-4172) to prevent
reloading metadata from HMS everytime.

Change-Id: I859b9fe93563ba886d0b5db6db42a14c88caada8
Reviewed-on: http://gerrit.cloudera.org:8080/6009
Reviewed-by: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Tested-by: Impala Public Jenkins
  • Loading branch information
Bharath Vissapragada authored and Impala Public Jenkins committed Feb 16, 2017
1 parent bd1d445 commit 26eaa26
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ public Table reloadTable(Table tbl) throws CatalogException {
throw new TableLoadingException("Error loading metadata for table: " +
db.getName() + "." + tblName.getTable_name(), e);
}
tbl.load(false, msClient.getHiveClient(), msTbl);
tbl.load(true, msClient.getHiveClient(), msTbl);
}
tbl.setCatalogVersion(newCatalogVersion);
LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
Expand Down
140 changes: 109 additions & 31 deletions fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -311,28 +313,10 @@ private void loadBlockMetadata(Path dirPath,
FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
fileStatus.getModificationTime());
BlockLocation[] locations = fileStatus.getBlockLocations();
String partPathDirName = partPathDir.toString();
for (BlockLocation loc: locations) {
Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
// Enumerate all replicas of the block, adding any unknown hosts
// to hostIndex_. We pick the network address from getNames() and
// map it to the corresponding hostname from getHosts().
List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
loc.getNames().length);
for (int i = 0; i < loc.getNames().length; ++i) {
TNetworkAddress networkAddress =
BlockReplica.parseLocation(loc.getNames()[i]);
replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
cachedHosts.contains(loc.getHosts()[i])));
}
FileBlock currentBlock =
new FileBlock(loc.getOffset(), loc.getLength(), replicas);
THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
fd.addThriftFileBlock(tHdfsFileBlock);
unknownDiskIdCount += loadDiskIds(loc, tHdfsFileBlock);
}
unknownDiskIdCount += setFdBlockMetadata(fd, locations);
if (LOG.isTraceEnabled()) {
LOG.trace("Adding file md dir: " + partPathDirName + " file: " + fileName);
LOG.trace("Adding file md dir: " + partPathDir.toString() + " file: " +
fileName);
}
// Update the partitions' metadata that this file belongs to.
for (HdfsPartition partition: partitions) {
Expand All @@ -353,6 +337,35 @@ private void loadBlockMetadata(Path dirPath,
}
}

/**
* Sets the block metadata for FileDescriptor 'fd' using block location metadata
* from 'locations'.
*/
private int setFdBlockMetadata(FileDescriptor fd, BlockLocation[] locations)
throws IOException {
int unknownFdDiskIds = 0;
for (BlockLocation loc: locations) {
Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
// Enumerate all replicas of the block, adding any unknown hosts
// to hostIndex_. We pick the network address from getNames() and
// map it to the corresponding hostname from getHosts().
List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
loc.getNames().length);
for (int i = 0; i < loc.getNames().length; ++i) {
TNetworkAddress networkAddress =
BlockReplica.parseLocation(loc.getNames()[i]);
replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
cachedHosts.contains(loc.getHosts()[i])));
}
FileBlock currentBlock =
new FileBlock(loc.getOffset(), loc.getLength(), replicas);
THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
fd.addThriftFileBlock(tHdfsFileBlock);
unknownFdDiskIds += loadDiskIds(loc, tHdfsFileBlock);
}
return unknownFdDiskIds;
}

/**
* Loads the disk IDs for BlockLocation 'location' and its corresponding file block.
* HDFS API for BlockLocation returns a storageID UUID string for each disk
Expand Down Expand Up @@ -387,6 +400,20 @@ private int loadDiskIds(BlockLocation location, THdfsFileBlock fileBlock) {
return unknownDiskIdCount;
}

/**
* Synthesize the block metadata for a given HdfsPartition object. Should only
* be called for FileSystems that do not support storage IDs.
*/
private void synthesizeBlockMetadata(FileSystem fs, HdfsPartition partition)
throws IOException {
Preconditions.checkState(!FileSystemUtil.supportsStorageIds(fs));
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
Path partitionPath = partition.getLocationPath();
partition.setFileDescriptors(new ArrayList<FileDescriptor>());
partsByPath.put(partitionPath, Lists.newArrayList(partition));
synthesizeBlockMetadata(fs, partitionPath, partsByPath);
}

/**
* For filesystems that don't support BlockLocation API, synthesize file blocks
* by manually splitting the file range into fixed-size blocks. That way, scan
Expand Down Expand Up @@ -755,11 +782,62 @@ private void loadAllPartitions(
loadMetadataAndDiskIds(dirsToLoad, partsByPath);
}

private void loadMetadataAndDiskIds(HdfsPartition partition) throws CatalogException {
Path partDirPath = partition.getLocationPath();
HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
partsByPath.put(partDirPath, Lists.newArrayList(partition));
loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath);
/**
* Refreshes block metadata information for 'partition'. This method is optimized for
* the case where the files in the partition have not changed dramatically. It first
* uses a listStatus() call on the partition directory to detect files with changed
* mtime and fetches their block locations using the getFileBlockLocations() method.
* Our benchmarks suggest that the listStatus() call is much faster then the listFiles()
* (up to ~40x faster in some cases). The initial table load still uses the listFiles()
* on the data directory that fetches both the FileStatus as well as BlockLocations in
* a single call.
*/
private void refreshFileMetadata(HdfsPartition partition) throws CatalogException {
Path partDir = partition.getLocationPath();
Preconditions.checkNotNull(partDir);
try {
FileSystem fs = partDir.getFileSystem(CONF);
if (!fs.exists(partDir)) {
partition.setFileDescriptors(new ArrayList<FileDescriptor>());
return;
}
if (!FileSystemUtil.supportsStorageIds(fs)) {
synthesizeBlockMetadata(fs, partition);
return;
}
// Index the partition file descriptors by their file names for O(1) look ups.
ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
partition.getFileDescriptors(), new Function<FileDescriptor, String>() {
public String apply(FileDescriptor desc) {
return desc.getFileName();
}
});
// Iterate through the current snapshot of the partition directory listing to
// figure out files that were newly added/modified.
List<FileDescriptor> newFileDescs = Lists.newArrayList();
int newPartSizeBytes = 0;
for (FileStatus fileStatus : fs.listStatus(partDir)) {
if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
String fileName = fileStatus.getPath().getName().toString();
FileDescriptor fd = fileDescsByName.get(fileName);
if (fd == null || partition.isMarkedCached() ||
fd.getFileLength() != fileStatus.getLen() ||
fd.getModificationTime() != fileStatus.getModificationTime()) {
fd = new FileDescriptor(fileName, fileStatus.getLen(),
fileStatus.getModificationTime());
setFdBlockMetadata(fd,
fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()));
}
newFileDescs.add(fd);
newPartSizeBytes += fileStatus.getLen();
}
partition.setFileDescriptors(newFileDescs);
numHdfsFiles_ += newFileDescs.size();
totalHdfsBytes_ += newPartSizeBytes;
} catch(IOException e) {
throw new CatalogException("Error loading block metadata for partition " +
partition.toString(), e);
}
}

/**
Expand All @@ -772,7 +850,7 @@ private void loadMetadataAndDiskIds(List<Path> locations,
LOG.info(String.format(
"Loading file and block metadata for %s partitions from %s paths: %s",
partsByPath.size(), locations.size(), getFullName()));
for (Path location: locations) { loadBlockMetadata(location, partsByPath); }
for (Path location: locations) loadBlockMetadata(location, partsByPath);
LOG.info(String.format(
"Loaded file and block metadata for %s partitions from %s paths: %s",
partsByPath.size(), locations.size(), getFullName()));
Expand Down Expand Up @@ -831,7 +909,7 @@ public HdfsPartition createAndLoadPartition(StorageDescriptor storageDescriptor,
org.apache.hadoop.hive.metastore.api.Partition msPartition)
throws CatalogException {
HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition);
loadMetadataAndDiskIds(hdfsPartition);
refreshFileMetadata(hdfsPartition);
return hdfsPartition;
}

Expand Down Expand Up @@ -1119,8 +1197,8 @@ private void updateUnpartitionedTableFileMd() throws CatalogException {
addDefaultPartition(msTbl.getSd());
HdfsPartition part = createPartition(msTbl.getSd(), null);
addPartition(part);
loadMetadataAndDiskIds(part);
if (isMarkedCached_) part.markCached();
refreshFileMetadata(part);
}

/**
Expand Down Expand Up @@ -1436,7 +1514,7 @@ private void loadPartitionsFromMetastore(Set<String> partitionNames,
// WRITE_ONLY the table's access level should be NONE.
accessLevel_ = TAccessLevel.READ_ONLY;
}
loadMetadataAndDiskIds(partition);
refreshFileMetadata(partition);
}
}

Expand Down Expand Up @@ -1492,7 +1570,7 @@ private void loadPartitionFileMetadata(StorageDescriptor storageDescriptor,
numHdfsFiles_ -= partition.getNumFileDescriptors();
totalHdfsBytes_ -= partition.getSize();
Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0);
loadMetadataAndDiskIds(partition);
refreshFileMetadata(partition);
}

@Override
Expand Down

0 comments on commit 26eaa26

Please sign in to comment.