Skip to content

Commit

Permalink
HubSpot Backport: HBASE-27225 Add BucketAllocator bucket size statist…
Browse files Browse the repository at this point in the history
…ic logging (apache#4637) (addendum)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
bbeaudreault committed Jul 26, 2022
1 parent 96e7283 commit 4e944f2
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,15 @@ final class BucketSizeInfo {
// Free bucket means it has space to allocate a block;
// Completely free bucket means it has no block.
private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
// only modified under synchronization, but also read outside it.
private volatile long fragmentationBytes;
private int sizeIndex;

BucketSizeInfo(int sizeIndex) {
bucketList = new LinkedMap();
freeBuckets = new LinkedMap();
completelyFreeBuckets = new LinkedMap();
fragmentationBytes = 0;
this.sizeIndex = sizeIndex;
}

Expand All @@ -196,7 +199,7 @@ public int sizeIndex() {
* Find a bucket to allocate a block
* @return the offset in the IOEngine
*/
public long allocateBlock() {
public long allocateBlock(int blockSize) {
Bucket b = null;
if (freeBuckets.size() > 0) {
// Use up an existing one first...
Expand All @@ -209,6 +212,9 @@ public long allocateBlock() {
if (b == null) return -1;
long result = b.allocate();
blockAllocated(b);
if (blockSize < b.getItemAllocationSize()) {
fragmentationBytes += b.getItemAllocationSize() - blockSize;
}
return result;
}

Expand Down Expand Up @@ -239,11 +245,14 @@ private synchronized void removeBucket(Bucket b) {
completelyFreeBuckets.remove(b);
}

public void freeBlock(Bucket b, long offset) {
public void freeBlock(Bucket b, long offset, int length) {
assert bucketList.containsKey(b);
// else we shouldn't have anything to free...
assert (!completelyFreeBuckets.containsKey(b));
b.free(offset);
if (length < b.getItemAllocationSize()) {
fragmentationBytes -= b.getItemAllocationSize() - length;
}
if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
}
Expand All @@ -265,9 +274,9 @@ public synchronized IndexStatistics statistics() {
// if bucket capacity is not perfectly divisible by a bucket's object size, there will
// be some left over per bucket. for some object sizes this may be large enough to be
// non-trivial and worth tuning by choosing a more divisible object size.
long waistedBytes = (bucketCapacity % bucketObjectSize) * (full + fillingBuckets);
return new IndexStatistics(free, used, bucketObjectSize, full,
completelyFreeBuckets.size(), waistedBytes);
long wastedBytes = (bucketCapacity % bucketObjectSize) * (full + fillingBuckets);
return new IndexStatistics(free, used, bucketObjectSize, full, completelyFreeBuckets.size(),
wastedBytes, fragmentationBytes);
}

@Override
Expand Down Expand Up @@ -459,7 +468,7 @@ public synchronized long allocateBlock(int blockSize) throws CacheFullException,
"; adjust BucketCache sizes " + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY +
" to accomodate if size seems reasonable and you want it cached.");
}
long offset = bsi.allocateBlock();
long offset = bsi.allocateBlock(blockSize);

// Ask caller to free up space and try again!
if (offset < 0)
Expand All @@ -481,11 +490,11 @@ private Bucket grabGlobalCompletelyFreeBucket() {
* @param offset block's offset
* @return size freed
*/
public synchronized int freeBlock(long offset) {
public synchronized int freeBlock(long offset, int length) {
int bucketNo = (int) (offset / bucketCapacity);
assert bucketNo >= 0 && bucketNo < buckets.length;
Bucket targetBucket = buckets[bucketNo];
bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset, length);
usedSize -= targetBucket.getItemAllocationSize();
return targetBucket.getItemAllocationSize();
}
Expand All @@ -504,68 +513,141 @@ public int sizeOfAllocation(long offset) {
return targetBucket.getItemAllocationSize();
}

/**
* Statistics to give a glimpse into the distribution of BucketCache objects. Each configured
* bucket size, denoted by {@link BucketSizeInfo}, gets an IndexStatistic. A BucketSizeInfo
* allocates blocks of a configured size from claimed buckets. If you have a bucket size of 512k,
* the corresponding BucketSizeInfo will always allocate chunks of 512k at a time regardless of
* actual request.
* <p>
* Over time, as a BucketSizeInfo gets more allocations, it will claim more buckets from the total
* pool of completelyFreeBuckets. As blocks are freed from a BucketSizeInfo, those buckets may be
* returned to the completelyFreeBuckets pool.
* <p>
* The IndexStatistics help visualize how these buckets are currently distributed, through counts
* of items, bytes, and fullBuckets. Additionally, mismatches between block sizes and bucket sizes
* can manifest in inefficient cache usage. These typically manifest in three ways:
* <p>
* 1. Allocation failures, because block size is larger than max bucket size. These show up in
* logs and can be alleviated by adding larger bucket sizes if appropriate.<br>
* 2. Memory fragmentation, because blocks are typically smaller than the bucket size. See
* {@link #fragmentationBytes()} for details.<br>
* 3. Memory waste, because a bucket's itemSize is not a perfect divisor of bucketCapacity. see
* {@link #wastedBytes()} for details.<br>
*/
static class IndexStatistics {
private long freeCount, usedCount, itemSize, totalCount, waistedBytes;
private long freeCount, usedCount, itemSize, totalCount, wastedBytes, fragmentationBytes;
private int fullBuckets, completelyFreeBuckets;

/**
* How many more items can be allocated from the currently claimed blocks of this bucket size
*/
public long freeCount() {
return freeCount;
}

/**
* How many items are currently taking up space in this bucket size's buckets
*/
public long usedCount() {
return usedCount;
}

/**
* Combined {@link #freeCount()} + {@link #usedCount()}
*/
public long totalCount() {
return totalCount;
}

/**
* How many more bytes can be allocated from the currently claimed blocks of this bucket size
*/
public long freeBytes() {
return freeCount * itemSize;
}

/**
* How many bytes are currently taking up space in this bucket size's buckets Note: If your
* items are less than the bucket size of this bucket, the actual used bytes by items will be
* lower than this value. But since a bucket size can only allocate items of a single size, this
* value is the true number of used bytes. The difference will be counted in
* {@link #fragmentationBytes()}.
*/
public long usedBytes() {
return usedCount * itemSize;
}

/**
* Combined {@link #totalCount()} * {@link #itemSize()}
*/
public long totalBytes() {
return totalCount * itemSize;
}

/**
* This bucket size can only allocate items of this size, even if the requested allocation size
* is smaller. The rest goes towards {@link #fragmentationBytes()}.
*/
public long itemSize() {
return itemSize;
}

/**
* How many buckets have been completely filled by blocks for this bucket size. These buckets
* can't accept any more blocks unless some existing are freed.
*/
public int fullBuckets() {
return fullBuckets;
}

/**
* How many buckets are currently claimed by this bucket size but as yet totally unused. These
* buckets are available for reallocation to other bucket sizes if those fill up.
*/
public int completelyFreeBuckets() {
return completelyFreeBuckets;
}

public long waistedBytes() {
return waistedBytes;
/**
* If {@link #bucketCapacity} is not perfectly divisible by this {@link #itemSize()}, the
* remainder will be unusable by in buckets of this size. A high value here may be optimized by
* trying to choose bucket sizes which can better divide {@link #bucketCapacity}.
*/
public long wastedBytes() {
return wastedBytes;
}

/**
* Every time you allocate blocks in these buckets where the block size is less than the bucket
* size, fragmentation increases by that difference. You can reduce fragmentation by lowering
* the bucket size so that it is closer to the typical block size. This may have the consequence
* of bumping some blocks to the next larger bucket size, so experimentation may be needed.
*/
public long fragmentationBytes() {
return fragmentationBytes;
}

public IndexStatistics(long free, long used, long itemSize, int fullBuckets,
int completelyFreeBuckets, long waistedBytes) {
setTo(free, used, itemSize, fullBuckets, completelyFreeBuckets, waistedBytes);
int completelyFreeBuckets, long wastedBytes, long fragmentationBytes) {
setTo(free, used, itemSize, fullBuckets, completelyFreeBuckets, wastedBytes,
fragmentationBytes);
}

public IndexStatistics() {
setTo(-1, -1, 0, 0, 0, 0);
setTo(-1, -1, 0, 0, 0, 0, 0);
}

public void setTo(long free, long used, long itemSize, int fullBuckets,
int completelyFreeBuckets, long waistedBytes) {
int completelyFreeBuckets, long wastedBytes, long fragmentationBytes) {
this.itemSize = itemSize;
this.freeCount = free;
this.usedCount = used;
this.totalCount = free + used;
this.fullBuckets = fullBuckets;
this.completelyFreeBuckets = completelyFreeBuckets;
this.waistedBytes = waistedBytes;
this.wastedBytes = wastedBytes;
this.fragmentationBytes = fragmentationBytes;
}
}

Expand All @@ -581,27 +663,35 @@ void logDebugStatistics() {
IndexStatistics total = new IndexStatistics();
IndexStatistics[] stats = getIndexStatistics(total);
LOG.debug("Bucket allocator statistics follow:");
LOG.debug(" Free bytes={}; used bytes={}; total bytes={}; waisted bytes={}; completelyFreeBuckets={}",
total.freeBytes(), total.usedBytes(), total.totalBytes(), total.waistedBytes(), total.completelyFreeBuckets());
LOG.debug(
" Free bytes={}; used bytes={}; total bytes={}; wasted bytes={}; fragmentation bytes={}; "
+ "completelyFreeBuckets={}",
total.freeBytes(), total.usedBytes(), total.totalBytes(), total.wastedBytes(),
total.fragmentationBytes(), total.completelyFreeBuckets());
for (IndexStatistics s : stats) {
LOG.debug(" Object size {}; used={}; free={}; total={}; waisted bytes={}; full buckets={}",
s.itemSize(), s.usedCount(), s.freeCount(), s.totalCount(), s.waistedBytes(), s.fullBuckets());
LOG.debug(
" Object size {}; used={}; free={}; total={}; wasted bytes={}; fragmentation bytes={}, "
+ "full buckets={}",
s.itemSize(), s.usedCount(), s.freeCount(), s.totalCount(), s.wastedBytes(),
s.fragmentationBytes(), s.fullBuckets());
}
}

IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
IndexStatistics[] stats = getIndexStatistics();
long totalfree = 0, totalused = 0, totalWaisted = 0;
long totalfree = 0, totalused = 0, totalWasted = 0, totalFragmented = 0;
int fullBuckets = 0, completelyFreeBuckets = 0;

for (IndexStatistics stat : stats) {
totalfree += stat.freeBytes();
totalused += stat.usedBytes();
totalWaisted += stat.waistedBytes();
totalWasted += stat.wastedBytes();
totalFragmented += stat.fragmentationBytes();
fullBuckets += stat.fullBuckets();
completelyFreeBuckets += stat.completelyFreeBuckets();
}
grandTotal.setTo(totalfree, totalused, 1, fullBuckets, completelyFreeBuckets, totalWaisted);
grandTotal.setTo(totalfree, totalused, 1, fullBuckets, completelyFreeBuckets, totalWasted,
totalFragmented);
return stats;
}

Expand All @@ -612,13 +702,6 @@ IndexStatistics[] getIndexStatistics() {
return stats;
}

public long freeBlock(long freeList[]) {
long sz = 0;
for (int i = 0; i < freeList.length; ++i)
sz += freeBlock(freeList[i]);
return sz;
}

public int getBucketIndex(long offset) {
return (int) (offset / bucketCapacity);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decre
* {@link BucketEntry#refCnt} becoming 0.
*/
void freeBucketEntry(BucketEntry bucketEntry) {
bucketAllocator.freeBlock(bucketEntry.offset());
bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
realCacheSize.add(-1 * bucketEntry.getLength());
}

Expand Down Expand Up @@ -1053,8 +1053,9 @@ void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
checkIOErrorIsTolerated();
// Since we failed sync, free the blocks in bucket allocator
for (int i = 0; i < entries.size(); ++i) {
if (bucketEntries[i] != null) {
bucketAllocator.freeBlock(bucketEntries[i].offset());
BucketEntry bucketEntry = bucketEntries[i];
if (bucketEntry != null) {
bucketAllocator.freeBlock(bucketEntry.offset(), bucketEntry.getLength());
bucketEntries[i] = null;
}
}
Expand Down Expand Up @@ -1467,7 +1468,7 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a
succ = true;
} finally {
if (!succ) {
alloc.freeBlock(offset);
alloc.freeBlock(offset, len);
}
}
realCacheSize.add(len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -161,15 +162,15 @@ public void testBucketAllocator() throws BucketAllocatorException {
final List<Integer> BLOCKSIZES = Arrays.asList(4 * 1024, 8 * 1024, 64 * 1024, 96 * 1024);

boolean full = false;
ArrayList<Long> allocations = new ArrayList<>();
ArrayList<Pair<Long, Integer>> allocations = new ArrayList<>();
// Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
// the cache is completely filled.
List<Integer> tmp = new ArrayList<>(BLOCKSIZES);
while (!full) {
Integer blockSize = null;
try {
blockSize = randFrom(tmp);
allocations.add(mAllocator.allocateBlock(blockSize));
allocations.add(new Pair<>(mAllocator.allocateBlock(blockSize), blockSize));
} catch (CacheFullException cfe) {
tmp.remove(blockSize);
if (tmp.isEmpty()) full = true;
Expand All @@ -180,12 +181,19 @@ public void testBucketAllocator() throws BucketAllocatorException {
BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
IndexStatistics indexStatistics = bucketSizeInfo.statistics();
assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());

// we know the block sizes above are multiples of 1024, but default bucket sizes give an
// additional 1024 on top of that so this counts towards fragmentation in our test
// real life may have worse fragmentation because blocks may not be perfectly sized to block
// size, given encoding/compression and large rows
assertEquals(1024 * indexStatistics.totalCount(), indexStatistics.fragmentationBytes());
}

mAllocator.logDebugStatistics();

for (long offset : allocations) {
assertEquals(mAllocator.sizeOfAllocation(offset), mAllocator.freeBlock(offset));
for (Pair<Long, Integer> allocation : allocations) {
assertEquals(mAllocator.sizeOfAllocation(allocation.getFirst()),
mAllocator.freeBlock(allocation.getFirst(), allocation.getSecond()));
}
assertEquals(0, mAllocator.getUsedSize());
}
Expand Down

0 comments on commit 4e944f2

Please sign in to comment.