Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27225 Add BucketAllocator bucket size statistic logging #4637

Merged
merged 4 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ final class BucketSizeInfo {
// Free bucket means it has space to allocate a block;
// Completely free bucket means it has no block.
private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
private LongAdder fragmentation;
private int sizeIndex;

BucketSizeInfo(int sizeIndex) {
bucketList = new LinkedMap();
freeBuckets = new LinkedMap();
completelyFreeBuckets = new LinkedMap();
fragmentation = new LongAdder();
this.sizeIndex = sizeIndex;
}

Expand All @@ -193,7 +195,7 @@ public int sizeIndex() {
* Find a bucket to allocate a block
* @return the offset in the IOEngine
*/
public long allocateBlock() {
public long allocateBlock(int originalSize) {
Bucket b = null;
if (freeBuckets.size() > 0) {
// Use up an existing one first...
Expand All @@ -206,6 +208,7 @@ public long allocateBlock() {
if (b == null) return -1;
long result = b.allocate();
blockAllocated(b);
fragmentation.add(bucketSizes[sizeIndex] - originalSize);
return result;
}

Expand Down Expand Up @@ -236,23 +239,36 @@ private synchronized void removeBucket(Bucket b) {
completelyFreeBuckets.remove(b);
}

public void freeBlock(Bucket b, long offset) {
public void freeBlock(Bucket b, long offset, int length) {
assert bucketList.containsKey(b);
// else we shouldn't have anything to free...
assert (!completelyFreeBuckets.containsKey(b));
b.free(offset);
fragmentation.add(-1 * (bucketSizes[sizeIndex] - length));
if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
}

public synchronized IndexStatistics statistics() {
long free = 0, used = 0;
int full = 0;
for (Object obj : bucketList.keySet()) {
Bucket b = (Bucket) obj;
free += b.freeCount();
used += b.usedCount();
if (!b.hasFreeSpace()) {
full++;
}
}
return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
int bucketObjectSize = bucketSizes[sizeIndex];
// this is most likely to always be 1 or 0
int fillingBuckets = Math.max(0, freeBuckets.size() - completelyFreeBuckets.size());
// if bucket capacity is not perfectly divisible by a bucket's object size, there will
// be some left over per bucket. for some object sizes this may be large enough to be
// non-trivial and worth tuning by choosing a more divisible object size.
long wastedBytes = (bucketCapacity % bucketObjectSize) * (full + fillingBuckets);
return new IndexStatistics(free, used, bucketObjectSize, full, completelyFreeBuckets.size(),
wastedBytes, fragmentation.sum());
}

@Override
Expand Down Expand Up @@ -434,7 +450,7 @@ public synchronized long allocateBlock(int blockSize)
+ "; adjust BucketCache sizes " + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY
+ " to accomodate if size seems reasonable and you want it cached.");
}
long offset = bsi.allocateBlock();
long offset = bsi.allocateBlock(blockSize);

// Ask caller to free up space and try again!
if (offset < 0) throw new CacheFullException(blockSize, bsi.sizeIndex());
Expand All @@ -455,11 +471,11 @@ private Bucket grabGlobalCompletelyFreeBucket() {
* @param offset block's offset
* @return size freed
*/
public synchronized int freeBlock(long offset) {
public synchronized int freeBlock(long offset, int length) {
int bucketNo = (int) (offset / bucketCapacity);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset, length);
usedSize -= targetBucket.getItemAllocationSize();
return targetBucket.getItemAllocationSize();
}
Expand All @@ -479,7 +495,8 @@ public int sizeOfAllocation(long offset) {
}

static class IndexStatistics {
private long freeCount, usedCount, itemSize, totalCount;
private long freeCount, usedCount, itemSize, totalCount, wastedBytes, fragmentationBytes;
private int fullBuckets, completelyFreeBuckets;

public long freeCount() {
return freeCount;
Expand Down Expand Up @@ -509,46 +526,84 @@ public long itemSize() {
return itemSize;
}

public IndexStatistics(long free, long used, long itemSize) {
setTo(free, used, itemSize);
public int fullBuckets() {
return fullBuckets;
}

public int completelyFreeBuckets() {
return completelyFreeBuckets;
}

public long wastedBytes() {
return wastedBytes;
}

public long fragmentationBytes() {
return fragmentationBytes;
}

public IndexStatistics(long free, long used, long itemSize, int fullBuckets,
int completelyFreeBuckets, long wastedBytes, long fragmentationBytes) {
setTo(free, used, itemSize, fullBuckets, completelyFreeBuckets, wastedBytes,
fragmentationBytes);
}

public IndexStatistics() {
setTo(-1, -1, 0);
setTo(-1, -1, 0, 0, 0, 0, 0);
}

public void setTo(long free, long used, long itemSize) {
public void setTo(long free, long used, long itemSize, int fullBuckets,
int completelyFreeBuckets, long wastedBytes, long fragmentationBytes) {
this.itemSize = itemSize;
this.freeCount = free;
this.usedCount = used;
this.totalCount = free + used;
this.fullBuckets = fullBuckets;
this.completelyFreeBuckets = completelyFreeBuckets;
this.wastedBytes = wastedBytes;
this.fragmentationBytes = fragmentationBytes;
}
}

public Bucket[] getBuckets() {
return this.buckets;
}

void logStatistics() {
void logDebugStatistics() {
if (!LOG.isDebugEnabled()) {
return;
}

IndexStatistics total = new IndexStatistics();
IndexStatistics[] stats = getIndexStatistics(total);
LOG.info("Bucket allocator statistics follow:\n");
LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes=" + total.usedBytes()
+ "; total bytes=" + total.totalBytes());
LOG.debug("Bucket allocator statistics follow:");
LOG.debug(
" Free bytes={}; used bytes={}; total bytes={}; wasted bytes={}; fragmentation bytes={}; completelyFreeBuckets={}",
total.freeBytes(), total.usedBytes(), total.totalBytes(), total.wastedBytes(),
total.fragmentationBytes(), total.completelyFreeBuckets());
for (IndexStatistics s : stats) {
LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount() + "; free="
+ s.freeCount() + "; total=" + s.totalCount());
LOG.debug(
" Object size {}; used={}; free={}; total={}; wasted bytes={}; fragmentation bytes={}, full buckets={}",
s.itemSize(), s.usedCount(), s.freeCount(), s.totalCount(), s.wastedBytes(),
s.fragmentationBytes(), s.fullBuckets());
}
}

IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
IndexStatistics[] stats = getIndexStatistics();
long totalfree = 0, totalused = 0;
long totalfree = 0, totalused = 0, totalWasted = 0, totalFragmented = 0;
int fullBuckets = 0, completelyFreeBuckets = 0;

for (IndexStatistics stat : stats) {
totalfree += stat.freeBytes();
totalused += stat.usedBytes();
totalWasted += stat.wastedBytes();
totalFragmented += stat.fragmentationBytes();
fullBuckets += stat.fullBuckets();
completelyFreeBuckets += stat.completelyFreeBuckets();
}
grandTotal.setTo(totalfree, totalused, 1);
grandTotal.setTo(totalfree, totalused, 1, fullBuckets, completelyFreeBuckets, totalWasted,
totalFragmented);
return stats;
}

Expand All @@ -559,13 +614,6 @@ IndexStatistics[] getIndexStatistics() {
return stats;
}

public long freeBlock(long freeList[]) {
long sz = 0;
for (int i = 0; i < freeList.length; ++i)
sz += freeBlock(freeList[i]);
return sz;
}

public int getBucketIndex(long offset) {
return (int) (offset / bucketCapacity);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre
* {@link BucketEntry#refCnt} becoming 0.
*/
void freeBucketEntry(BucketEntry bucketEntry) {
bucketAllocator.freeBlock(bucketEntry.offset());
bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
realCacheSize.add(-1 * bucketEntry.getLength());
}

Expand Down Expand Up @@ -738,6 +738,8 @@ public void logStats() {
+ cacheStats.getEvictedCount() + ", " + "evictedPerRun=" + cacheStats.evictedPerEviction()
+ ", " + "allocationFailCount=" + cacheStats.getAllocationFailCount());
cacheStats.reset();

bucketAllocator.logDebugStatistics();
}

public long getRealCacheSize() {
Expand Down Expand Up @@ -1119,8 +1121,9 @@ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws Inte
checkIOErrorIsTolerated();
// Since we failed sync, free the blocks in bucket allocator
for (int i = 0; i < entries.size(); ++i) {
if (bucketEntries[i] != null) {
bucketAllocator.freeBlock(bucketEntries[i].offset());
BucketEntry bucketEntry = bucketEntries[i];
if (bucketEntry != null) {
bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
bucketEntries[i] = null;
}
}
Expand Down Expand Up @@ -1538,7 +1541,7 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a
succ = true;
} finally {
if (!succ) {
alloc.freeBlock(offset);
alloc.freeBlock(offset, len);
}
}
realCacheSize.add(len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -169,15 +171,15 @@ public void testBucketAllocator() throws BucketAllocatorException {
final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);

boolean full = false;
ArrayList<Long> allocations = new ArrayList<>();
ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
// Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
// the cache is completely filled.
List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
while (!full) {
Integer blockSize = null;
try {
blockSize = randFrom(tmp);
allocations.add(mAllocator.allocateBlock(blockSize));
allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
} catch (CacheFullException cfe) {
tmp.remove(blockSize);
if (tmp.isEmpty()) full = true;
Expand All @@ -190,8 +192,11 @@ public void testBucketAllocator() throws BucketAllocatorException {
assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
}

for (long offset : allocations) {
assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
mAllocator.logDebugStatistics();

for (Pair<Long, Integer> allocation : allocations) {
assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
}
assertEquals(0, mAllocator.getUsedSize());
}
Expand Down Expand Up @@ -674,7 +679,7 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {

// initialize an mocked ioengine.
IOEngine ioEngine = Mockito.mock(IOEngine.class);
Mockito.when(ioEngine.usesSharedMemory()).thenReturn(false);
when(ioEngine.usesSharedMemory()).thenReturn(false);
// Mockito.doNothing().when(ioEngine).write(Mockito.any(ByteBuffer.class), Mockito.anyLong());
Mockito.doThrow(RuntimeException.class).when(ioEngine).write(Mockito.any(ByteBuffer.class),
Mockito.anyLong());
Expand Down