From 67484d12a5a6b6a84b6a32363697b9c4011e8b17 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Thu, 23 Nov 2023 11:24:31 +0000 Subject: [PATCH] HBASE-28176 PrefetchExecutor should stop once cache reaches capacity Change-Id: I24fd463dca220e27811782b21a62d4ea6da872db --- .../hadoop/hbase/io/hfile/BlockCache.java | 76 ++++++++++++++- .../hbase/io/hfile/CombinedBlockCache.java | 51 ++++++++++ .../hbase/io/hfile/HFilePreadReader.java | 86 ++++++++++------- .../hbase/io/hfile/HFileReaderImpl.java | 4 +- .../hbase/io/hfile/bucket/BucketCache.java | 95 ++++++++++++++++++- .../hadoop/hbase/io/hfile/TestPrefetch.java | 1 - .../hbase/io/hfile/TestPrefetchRSClose.java | 12 +-- .../io/hfile/TestPrefetchWithBucketCache.java | 91 +++++++++++++++--- .../io/hfile/bucket/TestBucketCache.java | 3 - .../bucket/TestBucketCachePersister.java | 51 ++++++++-- .../hfile/bucket/TestPrefetchPersistence.java | 3 - 11 files changed, 397 insertions(+), 76 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 91ebaaabd422..a62ca853ca60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -166,7 +167,80 @@ default boolean isMetaBlock(BlockType blockType) { } /** - * Returns the list of fully cached files + * Notifies the cache implementation that the given file has been fully cached (all its blocks + * made into the cache). + * @param fileName the file that has been completely cached. + */ + default void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, + long size) { + // noop + } + + /** + * Notifies the cache implementation that the given file had a block evicted + * @param fileName the file had a block evicted. + */ + default void notifyFileBlockEvicted(String fileName) { + // noop + } + + /** + * Checks whether there's enough space left in the cache to accommodate the passed block. This + * method may not be overridden by all implementing classes. In such cases, the returned Optional + * will be empty. For subclasses implementing this logic, the returned Optional would contain the + * boolean value reflecting if the passed block fits into the remaining cache space available. + * @param block the block we want to check if fits into the cache. + * @return empty optional if this method is not supported, otherwise the returned optional + * contains the boolean value informing if the block fits into the cache available space. + */ + default Optional blockFitsIntoTheCache(HFileBlock block) { + return Optional.empty(); + } + + /** + * Checks whether blocks for the passed file should be cached or not. This method may not be + * overridden by all implementing classes. In such cases, the returned Optional will be empty. For + * subclasses implementing this logic, the returned Optional would contain the boolean value + * reflecting if the passed file should indeed be cached. + * @param fileName to check if it should be cached. + * @return empty optional if this method is not supported, otherwise the returned optional + * contains the boolean value informing if the file should be cached. + */ + default Optional shouldCacheFile(String fileName) { + return Optional.empty(); + } + + /** + * Checks whether the block for the passed key is already cached. This method may not be + * overridden by all implementing classes. In such cases, the returned Optional will be empty. For + * subclasses implementing this logic, the returned Optional would contain the boolean value + * reflecting if the block for the passed key is already cached or not. + * @param key for the block we want to check if it's already in the cache. + * @return empty optional if this method is not supported, otherwise the returned optional + * contains the boolean value informing if the block is already cached. + */ + default Optional isAlreadyCached(BlockCacheKey key) { + return Optional.empty(); + } + + /** + * Returns an Optional containing the size of the block related to the passed key. If the block is + * not in the cache, returned optional will be empty. Also, this method may not be overridden by + * all implementing classes. In such cases, the returned Optional will be empty. + * @param key for the block we want to check if it's already in the cache. + * @return empty optional if this method is not supported, otherwise the returned optional + * contains the boolean value informing if the block is already cached. + */ + default Optional getBlockSize(BlockCacheKey key) { + return Optional.empty(); + } + + /** + * Returns an Optional containing the map of files that have been fully cached (all its blocks are + * present in the cache. This method may not be overridden by all implementing classes. In such + * cases, the returned Optional will be empty. + * @return empty optional if this method is not supported, otherwise the returned optional + * contains a map of all files that have been fully cached. */ default Optional>> getFullyCachedFiles() { return Optional.empty(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 1e0fe7709292..427f1771e669 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -20,6 +20,9 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.Pair; @@ -454,4 +457,52 @@ public FirstLevelBlockCache getFirstLevelCache() { public BlockCache getSecondLevelCache() { return l2Cache; } + + @Override + public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, + long size) { + l1Cache.getBlockCount(); + l1Cache.notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); + l2Cache.notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); + + } + + @Override + public void notifyFileBlockEvicted(String fileName) { + l1Cache.notifyFileBlockEvicted(fileName); + l1Cache.notifyFileBlockEvicted(fileName); + } + + @Override + public Optional blockFitsIntoTheCache(HFileBlock block) { + if (isMetaBlock(block.getBlockType())) { + return l1Cache.blockFitsIntoTheCache(block); + } else { + return l2Cache.blockFitsIntoTheCache(block); + } + } + + @Override + public Optional shouldCacheFile(String fileName) { + Optional l1Result = l1Cache.shouldCacheFile(fileName); + Optional l2Result = l2Cache.shouldCacheFile(fileName); + final Mutable combinedResult = new MutableBoolean(true); + l1Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue())); + l2Result.ifPresent(b -> combinedResult.setValue(b && combinedResult.getValue())); + return Optional.of(combinedResult.getValue()); + } + + @Override + public Optional isAlreadyCached(BlockCacheKey key) { + boolean result = + l1Cache.isAlreadyCached(key).orElseGet(() -> l2Cache.isAlreadyCached(key).orElse(false)); + return Optional.of(result); + } + + @Override + public Optional getBlockSize(BlockCacheKey key) { + Optional l1Result = l1Cache.getBlockSize(key); + return l1Result.isPresent() ? l1Result : l2Cache.getBlockSize(key); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 7cdbd5aff486..92f6a8169f32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -23,8 +23,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; -import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,17 +37,15 @@ public class HFilePreadReader extends HFileReaderImpl { public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig cacheConf, Configuration conf) throws IOException { super(context, fileInfo, cacheConf, conf); - final MutableBoolean fileAlreadyCached = new MutableBoolean(false); - Optional bucketCacheOptional = - BucketCache.getBucketCacheFromCacheConfig(cacheConf); - bucketCacheOptional.flatMap(BucketCache::getFullyCachedFiles).ifPresent(fcf -> { - fileAlreadyCached.setValue(fcf.get(path.getName()) == null ? false : true); + final MutableBoolean shouldCache = new MutableBoolean(true); + + cacheConf.getBlockCache().ifPresent(cache -> { + Optional result = cache.shouldCacheFile(path.getName()); + shouldCache.setValue(result.isPresent() ? result.get().booleanValue() : true); }); + // Prefetch file blocks upon open if requested - if ( - cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() - && !fileAlreadyCached.booleanValue() - ) { + if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() && shouldCache.booleanValue()) { PrefetchExecutor.request(path, new Runnable() { @Override public void run() { @@ -70,34 +66,38 @@ public void run() { } // Don't use BlockIterator here, because it's designed to read load-on-open section. long onDiskSizeOfNextBlock = -1; + // if we are here, block cache is present anyways + BlockCache cache = cacheConf.getBlockCache().get(); + boolean interrupted = false; + int blockCount = 0; + int dataBlockCount = 0; while (offset < end) { if (Thread.interrupted()) { break; } - // BucketCache can be persistent and resilient to restarts, so we check first if the - // block exists on its in-memory index, if so, we just update the offset and move on - // to the next block without actually going read all the way to the cache. - if (bucketCacheOptional.isPresent()) { - BucketCache cache = bucketCacheOptional.get(); - if (cache.getBackingMapValidated().get()) { - BlockCacheKey cacheKey = new BlockCacheKey(name, offset); - BucketEntry entry = cache.getBackingMap().get(cacheKey); - if (entry != null) { - cacheKey = new BlockCacheKey(name, offset); - entry = cache.getBackingMap().get(cacheKey); - if (entry == null) { - LOG.debug("No cache key {}, we'll read and cache it", cacheKey); - } else { - offset += entry.getOnDiskSizeWithHeader(); - LOG.debug( - "Found cache key {}. Skipping prefetch, the block is already cached.", - cacheKey); - continue; - } - } else { - LOG.debug("No entry in the backing map for cache key {}", cacheKey); - } + // Some cache implementations can be persistent and resilient to restarts, + // so we check first if the block exists on its in-memory index, if so, we just + // update the offset and move on to the next block without actually going read all + // the way to the cache. + BlockCacheKey cacheKey = new BlockCacheKey(name, offset); + if (cache.isAlreadyCached(cacheKey).orElse(false)) { + // Right now, isAlreadyCached is only supported by BucketCache, which should + // always cache data blocks. + int size = cache.getBlockSize(cacheKey).orElse(0); + if (size > 0) { + offset += size; + LOG.debug("Found block of size {} for cache key {}. " + + "Skipping prefetch, the block is already cached.", size, cacheKey); + blockCount++; + dataBlockCount++; + continue; + } else { + LOG.debug("Found block for cache key {}, but couldn't get its size. " + + "Maybe the cache implementation doesn't support it? " + + "We'll need to read the block from cache or file system. ", cacheKey); } + } else { + LOG.debug("No entry in the backing map for cache key {}. ", cacheKey); } // Perhaps we got our block from cache? Unlikely as this may be, if it happens, then // the internal-to-hfileblock thread local which holds the overread that gets the @@ -106,8 +106,20 @@ public void run() { HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true, /* pread= */false, false, false, null, null, true); try { + if (!cacheConf.isInMemory() && !cache.blockFitsIntoTheCache(block).orElse(true)) { + LOG.warn( + "Interrupting prefetch for file {} because block {} of size {} " + + "doesn't fit in the available cache space.", + path, cacheKey, block.getOnDiskSizeWithHeader()); + interrupted = true; + break; + } onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); offset += block.getOnDiskSizeWithHeader(); + blockCount++; + if (block.getBlockType().isData()) { + dataBlockCount++; + } } finally { // Ideally here the readBlock won't find the block in cache. We call this // readBlock so that block data is read from FS and cached in BC. we must call @@ -115,8 +127,10 @@ public void run() { block.release(); } } - final long fileSize = offset; - bucketCacheOptional.ifPresent(bc -> bc.fileCacheCompleted(path, fileSize)); + if (!interrupted) { + cacheConf.getBlockCache().get().notifyFileCachingCompleted(path, blockCount, + dataBlockCount, offset); + } } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index f31d202782fa..9d431428f376 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1360,9 +1360,9 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Don't need the unpacked block back and we're storing the block in the cache compressed if (cacheOnly && cacheCompressed && cacheOnRead) { - LOG.debug("Skipping decompression of block {} in prefetch", cacheKey); - // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { + LOG.debug("Skipping decompression of block {} in prefetch", cacheKey); + // Cache the block if necessary if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) { cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index f321d034bc6b..ba33d5e02c48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -51,6 +51,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -1328,7 +1329,7 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { } assert !cacheEnabled; - try (FileInputStream in = deleteFileOnClose(persistenceFile)) { + try (FileInputStream in = new FileInputStream(persistenceFile)) { int pblen = ProtobufMagic.lengthOfPBMagic(); byte[] pbuf = new byte[pblen]; int read = in.read(pbuf); @@ -1992,6 +1993,11 @@ public void clear() { re.getData().release(); } } + + public boolean hasBlocksForFile(String fileName) { + return delegate.keySet().stream().filter(key -> key.getHfileName().equals(fileName)) + .findFirst().isPresent(); + } } public Map getBackingMap() { @@ -2051,4 +2057,91 @@ public void fileCacheCompleted(Path filePath, long size) { regionCachedSizeMap.merge(regionName, size, (oldpf, fileSize) -> oldpf + fileSize); } + @Override + public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int dataBlockCount, + long size) { + // block eviction may be happening in the background as prefetch runs, + // so we need to count all blocks for this file in the backing map under + // a read lock for the block offset + final List locks = new ArrayList<>(); + LOG.debug("Notifying caching completed for file {}, with total blocks {}", fileName, + dataBlockCount); + try { + final MutableInt count = new MutableInt(); + LOG.debug("iterating over {} entries in the backing map", backingMap.size()); + backingMap.entrySet().stream().forEach(entry -> { + if (entry.getKey().getHfileName().equals(fileName.getName())) { + LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", + fileName, entry.getKey().getOffset()); + ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset()); + lock.readLock().lock(); + locks.add(lock); + if (backingMap.containsKey(entry.getKey())) { + count.increment(); + } + } + }); + // We may either place only data blocks on the BucketCache or all type of blocks + if (dataBlockCount == count.getValue() || totalBlockCount == count.getValue()) { + LOG.debug("File {} has now been fully cached.", fileName); + fileCacheCompleted(fileName, size); + } else { + LOG.debug( + "Prefetch executor completed for {}, but only {} blocks were cached. " + + "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.", + fileName, count.getValue(), dataBlockCount); + if (ramCache.hasBlocksForFile(fileName.getName())) { + LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms " + + "and try the verification again.", fileName); + Thread.sleep(100); + notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); + } else { + LOG.info( + "We found only {} blocks cached from a total of {} for file {}, " + + "but no blocks pending caching. Maybe cache is full?", + count, dataBlockCount, fileName); + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + for (ReentrantReadWriteLock lock : locks) { + lock.readLock().unlock(); + } + } + } + + @Override + public void notifyFileBlockEvicted(String fileName) { + fullyCachedFiles.remove(fileName); + } + + @Override + public Optional blockFitsIntoTheCache(HFileBlock block) { + long currentUsed = bucketAllocator.getUsedSize(); + boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) < acceptableSize(); + return Optional.of(result); + } + + @Override + public Optional shouldCacheFile(String fileName) { + // if we don't have the file in fullyCachedFiles, we should cache it + return Optional.of(!fullyCachedFiles.containsKey(fileName)); + } + + @Override + public Optional isAlreadyCached(BlockCacheKey key) { + return Optional.of(getBackingMap().containsKey(key)); + } + + @Override + public Optional getBlockSize(BlockCacheKey key) { + BucketEntry entry = backingMap.get(key); + if (entry == null) { + return Optional.empty(); + } else { + return Optional.of(entry.getOnDiskSizeWithHeader()); + } + + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index cdf9faf2490b..b58319179c56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -237,7 +237,6 @@ public void testPrefetchCompressed() throws Exception { Path storeFile = writeStoreFile("TestPrefetchCompressed", context); readStoreFileCacheOnly(storeFile); conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); - } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java index 879d8566c82e..a5023d5da004 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java @@ -19,7 +19,6 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; @@ -67,7 +66,7 @@ public class TestPrefetchRSClose { MiniZooKeeperCluster zkCluster; SingleProcessHBaseCluster cluster; StartTestingClusterOption option = - StartTestingClusterOption.builder().numRegionServers(2).build(); + StartTestingClusterOption.builder().numRegionServers(1).build(); @Before public void setup() throws Exception { @@ -81,7 +80,6 @@ public void setup() throws Exception { conf.set("hbase.bucketcache.persistent.path", testDir + "/bucket.persistence"); zkCluster = TEST_UTIL.startMiniZKCluster(); cluster = TEST_UTIL.startMiniHBaseCluster(option); - assertEquals(2, cluster.getRegionServerThreads().size()); cluster.setConf(conf); } @@ -117,9 +115,7 @@ public void testPrefetchPersistence() throws Exception { // Default interval for cache persistence is 1000ms. So after 1000ms, both the persistence files // should exist. - HRegionServer regionServingRS = cluster.getRegionServer(1).getRegions(tableName).size() == 1 - ? cluster.getRegionServer(1) - : cluster.getRegionServer(0); + HRegionServer regionServingRS = cluster.getRegionServer(0); Admin admin = TEST_UTIL.getAdmin(); List cachedFilesList = admin.getCachedFilesList(regionServingRS.getServerName()); @@ -133,10 +129,6 @@ public void testPrefetchPersistence() throws Exception { LOG.info("Stopped Region Server 0."); Thread.sleep(1000); assertTrue(new File(testDir + "/bucket.persistence").exists()); - - // Start the RS and validate - cluster.startRegionServer(); - assertFalse(new File(testDir + "/bucket.persistence").exists()); } @After diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index 93f09231f740..446298235471 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -38,12 +39,16 @@ import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -72,8 +77,6 @@ public class TestPrefetchWithBucketCache { private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length - 2; private static final int DATA_BLOCK_SIZE = 2048; - private static final int NUM_KV = 100; - private Configuration conf; private CacheConfig cacheConf; private FileSystem fs; @@ -87,9 +90,6 @@ public void setUp() throws IOException { File testDir = new File(name.getMethodName()); testDir.mkdir(); conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() + "/bucket.cache"); - conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); - blockCache = BlockCacheFactory.createBlockCache(conf); - cacheConf = new CacheConfig(conf, blockCache); } @After @@ -102,7 +102,10 @@ public void tearDown() { @Test public void testPrefetchDoesntOverwork() throws Exception { - Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork"); + conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); + blockCache = BlockCacheFactory.createBlockCache(conf); + cacheConf = new CacheConfig(conf, blockCache); + Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork", 100); // Prefetches the file blocks LOG.debug("First read should prefetch the blocks."); readStoreFile(storeFile); @@ -123,7 +126,7 @@ public void testPrefetchDoesntOverwork() throws Exception { BlockCacheKey key = snapshot.keySet().stream().findFirst().get(); LOG.debug("removing block {}", key); bc.getBackingMap().remove(key); - bc.getFullyCachedFiles().ifPresent(fcf -> fcf.remove(storeFile.getName())); + bc.getFullyCachedFiles().get().remove(storeFile.getName()); assertTrue(snapshot.size() > bc.getBackingMap().size()); LOG.debug("Third read should prefetch again, as we removed one block for the file."); readStoreFile(storeFile); @@ -131,6 +134,57 @@ public void testPrefetchDoesntOverwork() throws Exception { assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); } + @Test + public void testPrefetchInterruptOnCapacity() throws Exception { + conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); + conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); + conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); + conf.setDouble("hbase.bucketcache.minfactor", 0.95); + conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01); + blockCache = BlockCacheFactory.createBlockCache(conf); + cacheConf = new CacheConfig(conf, blockCache); + Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000); + // Prefetches the file blocks + LOG.debug("First read should prefetch the blocks."); + createReaderAndWaitForPrefetchInterruption(storeFile); + BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); + long evictionsFirstPrefetch = bc.getStats().getEvictionCount(); + LOG.debug("evictions after first prefetch: {}", bc.getStats().getEvictionCount()); + HFile.Reader reader = createReaderAndWaitForPrefetchInterruption(storeFile); + LOG.debug("evictions after second prefetch: {}", bc.getStats().getEvictionCount()); + assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 10); + HFileScanner scanner = reader.getScanner(conf, true, true); + scanner.seekTo(); + while (scanner.next()) { + // do a full scan to force some evictions + LOG.trace("Iterating the full scan to evict some blocks"); + } + scanner.close(); + LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount()); + // The scanner should had triggered at least 3x evictions from the prefetch, + // as we try cache each block without interruption. + assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch); + } + + @Test + public void testPrefetchDoesntInterruptInMemoryOnCapacity() throws Exception { + conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); + conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072"); + conf.setDouble("hbase.bucketcache.acceptfactor", 0.98); + conf.setDouble("hbase.bucketcache.minfactor", 0.95); + conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01); + blockCache = BlockCacheFactory.createBlockCache(conf); + ColumnFamilyDescriptor family = + ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build(); + cacheConf = new CacheConfig(conf, family, blockCache, ByteBuffAllocator.HEAP); + Path storeFile = writeStoreFile("testPrefetchDoesntInterruptInMemoryOnCapacity", 10000); + // Prefetches the file blocks + LOG.debug("First read should prefetch the blocks."); + createReaderAndWaitForPrefetchInterruption(storeFile); + BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); + assertTrue(bc.getStats().getEvictedCount() > 200); + } + private void readStoreFile(Path storeFilePath) throws Exception { readStoreFile(storeFilePath, (r, o) -> { HFileBlock block = null; @@ -170,18 +224,33 @@ private void readStoreFile(Path storeFilePath, } } - private Path writeStoreFile(String fname) throws IOException { + private HFile.Reader createReaderAndWaitForPrefetchInterruption(Path storeFilePath) + throws Exception { + // Open the file + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, true, conf); + + while (!reader.prefetchComplete()) { + // Sleep for a bit + Thread.sleep(1000); + } + assertEquals(0, BucketCache.getBucketCacheFromCacheConfig(cacheConf).get().getFullyCachedFiles() + .get().size()); + + return reader; + } + + private Path writeStoreFile(String fname, int numKVs) throws IOException { HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); - return writeStoreFile(fname, meta); + return writeStoreFile(fname, meta, numKVs); } - private Path writeStoreFile(String fname, HFileContext context) throws IOException { + private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(storeFileParentDir).withFileContext(context).build(); Random rand = ThreadLocalRandom.current(); final int rowLen = 32; - for (int i = 0; i < NUM_KV; ++i) { + for (int i = 0; i < numKVs; ++i) { byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i); byte[] v = RandomKeyValueUtil.randomValue(rand); int cfLen = rand.nextInt(k.length - rowLen + 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 0cbafedc7c53..6a9b5bf382a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -363,7 +363,6 @@ private void testRetrievalUtils(Path testDir, String ioEngineName) assertTrue(new File(persistencePath).exists()); bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); } finally { if (bucketCache != null) { @@ -820,7 +819,6 @@ public void testFreeBucketEntryRestoredFromFile() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { @@ -877,7 +875,6 @@ public void testBlockAdditionWaitWhenCache() throws Exception { // restore cache from file bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); - assertFalse(new File(persistencePath).exists()); assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize()); for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index bd69f28e1eac..f6d3efa9015d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOException; +import java.util.Iterator; +import java.util.Map; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; @@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor; import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; @@ -131,18 +134,50 @@ public void testPrefetchPersistenceCrashNegative() throws Exception { @Test public void testPrefetchListUponBlockEviction() throws Exception { Configuration conf = setupBucketCacheConfig(200); - BucketCache bucketCache1 = setupBucketCache(conf); - CacheConfig cacheConf = new CacheConfig(conf, bucketCache1); + BucketCache bucketCache = setupBucketCache(conf); + CacheConfig cacheConf = new CacheConfig(conf, bucketCache); + FileSystem fs = HFileSystem.get(conf); + // Load Blocks in cache + Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs); + readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache); + int retries = 0; + while (!bucketCache.fullyCachedFiles.containsKey(storeFile.getName()) && retries < 5) { + Thread.sleep(500); + retries++; + } + assertTrue(retries < 5); + BlockCacheKey bucketCacheKey = bucketCache.backingMap.entrySet().iterator().next().getKey(); + // Evict Blocks from cache + bucketCache.evictBlock(bucketCacheKey); + assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); + cleanupBucketCache(bucketCache); + } + + @Test + public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception { + Configuration conf = setupBucketCacheConfig(200); + BucketCache bucketCache = setupBucketCache(conf); + CacheConfig cacheConf = new CacheConfig(conf, bucketCache); FileSystem fs = HFileSystem.get(conf); // Load Blocks in cache Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs); - readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1); - Thread.sleep(500); + HFile.createReader(fs, storeFile, cacheConf, true, conf); + while (bucketCache.backingMap.size() == 0) { + Thread.sleep(10); + } + Iterator> it = + bucketCache.backingMap.entrySet().iterator(); // Evict Blocks from cache - assertTrue(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName())); - BlockCacheKey bucketCacheKey = bucketCache1.backingMap.entrySet().iterator().next().getKey(); - bucketCache1.evictBlock(bucketCacheKey); - assertFalse(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName())); + bucketCache.evictBlock(it.next().getKey()); + bucketCache.evictBlock(it.next().getKey()); + int retries = 0; + while (!PrefetchExecutor.isCompleted(storeFile) && retries < 5) { + Thread.sleep(500); + retries++; + } + assertTrue(retries < 5); + assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName())); + cleanupBucketCache(bucketCache); } public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, CacheConfig cacheConf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java index f15874bc61c2..035cdc3f887e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java @@ -110,7 +110,6 @@ public void setup() throws IOException { @Test public void testPrefetchPersistence() throws Exception { - bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); @@ -133,8 +132,6 @@ public void testPrefetchPersistence() throws Exception { constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", 60 * 1000, conf); cacheConf = new CacheConfig(conf, bucketCache); - assertFalse(new File(testDir + "/bucket.persistence").exists()); - assertFalse(new File(testDir + "/prefetch.persistence").exists()); assertTrue(usedSize != 0); readStoreFile(storeFile, 0); readStoreFile(storeFile2, 0);