Skip to content

Commit

Permalink
HBASE-22606 BucketCache additional tests
Browse files Browse the repository at this point in the history
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Sakthi <sakthivel.azhaku@gmail.com>
  • Loading branch information
virajjasani authored and wchevreuil committed Jul 9, 2019
1 parent 605f8a1 commit 9ac9505
Showing 1 changed file with 149 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ public static Iterable<Object[]> data() {
final long capacitySize = 32 * 1024 * 1024;
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
String ioEngineName = "offheap";
String persistencePath = null;
private String ioEngineName = "offheap";

private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();

private static class MockedBucketCache extends BucketCache {

Expand All @@ -136,14 +137,27 @@ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
@Before
public void setup() throws IOException {
cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
constructedBlockSizes, writeThreads, writerQLen, null);
}

@After
public void tearDown() {
cache.shutdown();
}

/**
* Test Utility to create test dir and return name
*
* @return return name of created dir
* @throws IOException throws IOException
*/
private Path createAndGetTestDir() throws IOException {
final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
return testDir;
}


/**
* Return a random element from {@code a}.
*/
Expand Down Expand Up @@ -255,51 +269,128 @@ public void run() {

@Test
public void testRetrieveFromFile() throws Exception {
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);

Path testDir = createAndGetTestDir();
String ioEngineName = "file:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}

@Test
public void testRetrieveFromMMap() throws Exception {
final Path testDir = createAndGetTestDir();
final String ioEngineName = "mmap:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
}

@Test
public void testRetrieveFromPMem() throws Exception {
final Path testDir = createAndGetTestDir();
final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);

HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}

private void testRetrievalUtils(Path testDir, String ioEngineName)
throws IOException, InterruptedException {
final String persistencePath = testDir + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
try {
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
} finally {
bucketCache.shutdown();
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
}

// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
// persist cache to file
bucketCache.shutdown();
assertTrue(new File(persistencePath).exists());
@Test
public void testRetrieveUnsupportedIOE() throws Exception {
try {
final Path testDir = createAndGetTestDir();
final String ioEngineName = testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
Assert.fail("Should have thrown IllegalArgumentException because of unsupported IOEngine!!");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Don't understand io engine name for cache- prefix with file:, " +
"files:, mmap: or offheap", e.getMessage());
}
}

// reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k)
// so it can't restore cache from file
int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
@Test
public void testRetrieveFromMultipleFiles() throws Exception {
final Path testDirInitial = createAndGetTestDir();
final Path newTestDir = new HBaseTestingUtility().getDataTestDir();
HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(newTestDir);
String ioEngineName = new StringBuilder("files:").append(testDirInitial)
.append("/bucket1.cache").append(FileIOEngine.FILE_DELIMITER).append(newTestDir)
.append("/bucket2.cache").toString();
testRetrievalUtils(testDirInitial, ioEngineName);
int[] smallBucketSizes = new int[]{3 * 1024, 5 * 1024};
String persistencePath = testDirInitial + "/bucket.persistence";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
HBASE_TESTING_UTILITY.cleanupTestDir();
}

TEST_UTIL.cleanupTestDir();
@Test
public void testRetrieveFromFileWithoutPersistence() throws Exception {
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
try {
final Path testDir = createAndGetTestDir();
String ioEngineName = "file:" + testDir + "/bucket.cache";
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
for (HFileBlockPair block : blocks) {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
bucketCache.shutdown();
bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
} finally {
bucketCache.shutdown();
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}

@Test
Expand All @@ -322,13 +413,32 @@ public void testGetPartitionSize() throws IOException {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);

validateGetPartitionSize(cache, 0.1f, 0.5f);
validateGetPartitionSize(cache, 0.7f, 0.5f);
validateGetPartitionSize(cache, 0.2f, 0.5f);
}

@Test
public void testCacheSizeCapacity() throws IOException {
// Test cache capacity (capacity / blockSize) < Integer.MAX_VALUE
validateGetPartitionSize(cache, BucketCache.DEFAULT_SINGLE_FACTOR,
BucketCache.DEFAULT_MIN_FACTOR);
Configuration conf = HBaseConfiguration.create();
conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
try {
new BucketCache(ioEngineName, Long.MAX_VALUE, 1, constructedBlockSizes, writeThreads,
writerQLen, null, 100, conf);
Assert.fail("Should have thrown IllegalArgumentException because of large cache capacity!");
} catch (IllegalArgumentException e) {
Assert.assertEquals("Cache capacity is too large, only support 32TB now", e.getMessage());
}
}

@Test
public void testValidBucketCacheConfigs() throws IOException {
Configuration conf = HBaseConfiguration.create();
Expand All @@ -340,7 +450,7 @@ public void testValidBucketCacheConfigs() throws IOException {
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);

BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);

assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
cache.getAcceptableFactor(), 0);
Expand Down Expand Up @@ -408,7 +518,7 @@ private void checkConfigValues(Configuration conf, Map<String, float[]> configMa
conf.setFloat(configName, configMap.get(configName)[i]);
}
BucketCache cache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath, 100, conf);
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
assertTrue("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
} catch (IllegalArgumentException e) {
assertFalse("Created BucketCache and expected it to succeed: " + expectSuccess[i] + ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
Expand Down

0 comments on commit 9ac9505

Please sign in to comment.