Skip to content

Commit

Permalink
HBASE-28176 PrefetchExecutor should stop once cache reaches capacity
Browse files Browse the repository at this point in the history
Change-Id: I24fd463dca220e27811782b21a62d4ea6da872db
  • Loading branch information
wchevreuil committed Nov 23, 2023
1 parent 1203c20 commit 67484d1
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -166,7 +167,80 @@ default boolean isMetaBlock(BlockType blockType) {
}

/**
* Returns the list of fully cached files
* Notifies the cache implementation that the given file has been fully cached (all its blocks
* made into the cache).
* @param fileName the file that has been completely cached.
*/
default void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount,
long size) {
// noop
}

/**
* Notifies the cache implementation that the given file had a block evicted
* @param fileName the file had a block evicted.
*/
default void notifyFileBlockEvicted(String fileName) {
// noop
}

/**
* Checks whether there's enough space left in the cache to accommodate the passed block. This
* method may not be overridden by all implementing classes. In such cases, the returned Optional
* will be empty. For subclasses implementing this logic, the returned Optional would contain the
* boolean value reflecting if the passed block fits into the remaining cache space available.
* @param block the block we want to check if fits into the cache.
* @return empty optional if this method is not supported, otherwise the returned optional
* contains the boolean value informing if the block fits into the cache available space.
*/
default Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
return Optional.empty();
}

/**
* Checks whether blocks for the passed file should be cached or not. This method may not be
* overridden by all implementing classes. In such cases, the returned Optional will be empty. For
* subclasses implementing this logic, the returned Optional would contain the boolean value
* reflecting if the passed file should indeed be cached.
* @param fileName to check if it should be cached.
* @return empty optional if this method is not supported, otherwise the returned optional
* contains the boolean value informing if the file should be cached.
*/
default Optional<Boolean> shouldCacheFile(String fileName) {
return Optional.empty();
}

/**
* Checks whether the block for the passed key is already cached. This method may not be
* overridden by all implementing classes. In such cases, the returned Optional will be empty. For
* subclasses implementing this logic, the returned Optional would contain the boolean value
* reflecting if the block for the passed key is already cached or not.
* @param key for the block we want to check if it's already in the cache.
* @return empty optional if this method is not supported, otherwise the returned optional
* contains the boolean value informing if the block is already cached.
*/
default Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
return Optional.empty();
}

/**
* Returns an Optional containing the size of the block related to the passed key. If the block is
* not in the cache, returned optional will be empty. Also, this method may not be overridden by
* all implementing classes. In such cases, the returned Optional will be empty.
* @param key for the block we want to check if it's already in the cache.
* @return empty optional if this method is not supported, otherwise the returned optional
* contains the boolean value informing if the block is already cached.
*/
default Optional<Integer> getBlockSize(BlockCacheKey key) {
return Optional.empty();
}

/**
* Returns an Optional containing the map of files that have been fully cached (all its blocks are
* present in the cache. This method may not be overridden by all implementing classes. In such
* cases, the returned Optional will be empty.
* @return empty optional if this method is not supported, otherwise the returned optional
* contains a map of all files that have been fully cached.
*/
default Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -454,4 +457,52 @@ public FirstLevelBlockCache getFirstLevelCache() {
public BlockCache getSecondLevelCache() {
return l2Cache;
}

@Override
public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount,
long size) {
l1Cache.getBlockCount();
l1Cache.notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
l2Cache.notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);

}

@Override
public void notifyFileBlockEvicted(String fileName) {
l1Cache.notifyFileBlockEvicted(fileName);
l1Cache.notifyFileBlockEvicted(fileName);
}

@Override
public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
if (isMetaBlock(block.getBlockType())) {
return l1Cache.blockFitsIntoTheCache(block);
} else {
return l2Cache.blockFitsIntoTheCache(block);
}
}

@Override
public Optional<Boolean> shouldCacheFile(String fileName) {
Optional<Boolean> l1Result = l1Cache.shouldCacheFile(fileName);
Optional<Boolean> l2Result = l2Cache.shouldCacheFile(fileName);
final Mutable<Boolean> combinedResult = new MutableBoolean(true);
l1Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
l2Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue()));
return Optional.of(combinedResult.getValue());
}

@Override
public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
boolean result =
l1Cache.isAlreadyCached(key).orElseGet(() -> l2Cache.isAlreadyCached(key).orElse(false));
return Optional.of(result);
}

@Override
public Optional<Integer> getBlockSize(BlockCacheKey key) {
Optional<Integer> l1Result = l1Cache.getBlockSize(key);
return l1Result.isPresent() ? l1Result : l2Cache.getBlockSize(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,17 +37,15 @@ public class HFilePreadReader extends HFileReaderImpl {
public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf,
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
Optional<BucketCache> bucketCacheOptional =
BucketCache.getBucketCacheFromCacheConfig(cacheConf);
bucketCacheOptional.flatMap(BucketCache::getFullyCachedFiles).ifPresent(fcf -> {
fileAlreadyCached.setValue(fcf.get(path.getName()) == null ? false : true);
final MutableBoolean shouldCache = new MutableBoolean(true);

cacheConf.getBlockCache().ifPresent(cache -> {
Optional<Boolean> result = cache.shouldCacheFile(path.getName());
shouldCache.setValue(result.isPresent() ? result.get().booleanValue() : true);
});

// Prefetch file blocks upon open if requested
if (
cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()
&& !fileAlreadyCached.booleanValue()
) {
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() && shouldCache.booleanValue()) {
PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
Expand All @@ -70,34 +66,38 @@ public void run() {
}
// Don't use BlockIterator here, because it's designed to read load-on-open section.
long onDiskSizeOfNextBlock = -1;
// if we are here, block cache is present anyways
BlockCache cache = cacheConf.getBlockCache().get();
boolean interrupted = false;
int blockCount = 0;
int dataBlockCount = 0;
while (offset < end) {
if (Thread.interrupted()) {
break;
}
// BucketCache can be persistent and resilient to restarts, so we check first if the
// block exists on its in-memory index, if so, we just update the offset and move on
// to the next block without actually going read all the way to the cache.
if (bucketCacheOptional.isPresent()) {
BucketCache cache = bucketCacheOptional.get();
if (cache.getBackingMapValidated().get()) {
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
BucketEntry entry = cache.getBackingMap().get(cacheKey);
if (entry != null) {
cacheKey = new BlockCacheKey(name, offset);
entry = cache.getBackingMap().get(cacheKey);
if (entry == null) {
LOG.debug("No cache key {}, we'll read and cache it", cacheKey);
} else {
offset += entry.getOnDiskSizeWithHeader();
LOG.debug(
"Found cache key {}. Skipping prefetch, the block is already cached.",
cacheKey);
continue;
}
} else {
LOG.debug("No entry in the backing map for cache key {}", cacheKey);
}
// Some cache implementations can be persistent and resilient to restarts,
// so we check first if the block exists on its in-memory index, if so, we just
// update the offset and move on to the next block without actually going read all
// the way to the cache.
BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
if (cache.isAlreadyCached(cacheKey).orElse(false)) {
// Right now, isAlreadyCached is only supported by BucketCache, which should
// always cache data blocks.
int size = cache.getBlockSize(cacheKey).orElse(0);
if (size > 0) {
offset += size;
LOG.debug("Found block of size {} for cache key {}. "
+ "Skipping prefetch, the block is already cached.", size, cacheKey);
blockCount++;
dataBlockCount++;
continue;
} else {
LOG.debug("Found block for cache key {}, but couldn't get its size. "
+ "Maybe the cache implementation doesn't support it? "
+ "We'll need to read the block from cache or file system. ", cacheKey);
}
} else {
LOG.debug("No entry in the backing map for cache key {}. ", cacheKey);
}
// Perhaps we got our block from cache? Unlikely as this may be, if it happens, then
// the internal-to-hfileblock thread local which holds the overread that gets the
Expand All @@ -106,17 +106,31 @@ public void run() {
HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
try {
if (!cacheConf.isInMemory() && !cache.blockFitsIntoTheCache(block).orElse(true)) {
LOG.warn(
"Interrupting prefetch for file {} because block {} of size {} "
+ "doesn't fit in the available cache space.",
path, cacheKey, block.getOnDiskSizeWithHeader());
interrupted = true;
break;
}
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
blockCount++;
if (block.getBlockType().isData()) {
dataBlockCount++;
}
} finally {
// Ideally here the readBlock won't find the block in cache. We call this
// readBlock so that block data is read from FS and cached in BC. we must call
// returnBlock here to decrease the reference count of block.
block.release();
}
}
final long fileSize = offset;
bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path, fileSize));
if (!interrupted) {
cacheConf.getBlockCache().get().notifyFileCachingCompleted(path, blockCount,
dataBlockCount, offset);
}
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
if (LOG.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1360,9 +1360,9 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo

// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
Expand Down
Loading

0 comments on commit 67484d1

Please sign in to comment.