Skip to content

Commit

Permalink
HBASE-27170 ByteBuffAllocator leak when decompressing blocks near min…
Browse files Browse the repository at this point in the history
…SizeForReservoirUse (#4592)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
bbeaudreault committed Jul 4, 2022
1 parent 724bf78 commit f3f292f
Show file tree
Hide file tree
Showing 7 changed files with 664 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,20 @@ public ByteBuff allocate(int size) {
// just allocate the ByteBuffer from on-heap.
bbs.add(allocateOnHeap(remain));
}
ByteBuff bb = ByteBuff.wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
});

ByteBuff bb;
// we only need a recycler if we successfully pulled from the pool
// this matters for determining whether to add leak detection in RefCnt
if (lenFromReservoir == 0) {
bb = ByteBuff.wrap(bbs);
} else {
bb = ByteBuff.wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
});
}

bb.limit(size);
return bb;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.nio;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -547,6 +548,28 @@ public static ByteBuff wrap(ByteBuffer buffer) {
return wrap(buffer, RefCnt.create());
}

/**
* Calling this method in strategic locations where ByteBuffs are referenced may help diagnose
* potential buffer leaks. We pass the buffer itself as a default hint, but one can use
* {@link #touch(Object)} to pass their own hint as well.
*/
@Override
public ByteBuff touch() {
return touch(this);
}

@Override
public ByteBuff touch(Object hint) {
refCnt.touch(hint);
return this;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public RefCnt getRefCnt() {
return refCnt;
}

/**
* Make this private because we don't want to expose the refCnt related wrap method to upstream.
*/
Expand Down
62 changes: 60 additions & 2 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.nio;

import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetectorFactory;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakTracker;

/**
* Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
Expand All @@ -31,7 +35,10 @@
@InterfaceAudience.Private
public class RefCnt extends AbstractReferenceCounted {

private Recycler recycler = ByteBuffAllocator.NONE;
private static final ResourceLeakDetector<RefCnt> detector =
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(RefCnt.class);
private final Recycler recycler;
private final ResourceLeakTracker<RefCnt> leak;

/**
* Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
Expand All @@ -49,15 +56,66 @@ public static RefCnt create(Recycler recycler) {

public RefCnt(Recycler recycler) {
this.recycler = recycler;
this.leak = recycler == ByteBuffAllocator.NONE ? null : detector.track(this);
}

@Override
public ReferenceCounted retain() {
maybeRecord();
return super.retain();
}

@Override
public ReferenceCounted retain(int increment) {
maybeRecord();
return super.retain(increment);
}

@Override
public boolean release() {
maybeRecord();
return super.release();
}

@Override
public boolean release(int decrement) {
maybeRecord();
return super.release(decrement);
}

@Override
protected final void deallocate() {
this.recycler.free();
if (leak != null) {
this.leak.close(this);
}
}

@Override
public RefCnt touch() {
maybeRecord();
return this;
}

@Override
public final ReferenceCounted touch(Object hint) {
throw new UnsupportedOperationException();
maybeRecord(hint);
return this;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public Recycler getRecycler() {
return recycler;
}

private void maybeRecord() {
maybeRecord(null);
}

private void maybeRecord(Object hint) {
if (leak != null) {
leak.record(hint);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.getHeapAllocationRatio;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -45,6 +46,21 @@ public class TestByteBuffAllocator {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBuffAllocator.class);

@Test
public void testRecycleOnlyPooledBuffers() {
int maxBuffersInPool = 10;
int bufSize = 1024;
int minSize = bufSize / 8;
ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);

ByteBuff buff = alloc.allocate(minSize - 1);
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());

alloc = new ByteBuffAllocator(true, 0, bufSize, minSize);
buff = alloc.allocate(minSize * 2);
assertSame(ByteBuffAllocator.NONE, buff.getRefCnt().getRecycler());
}

@Test
public void testAllocateByteBuffToReadInto() {
int maxBuffersInPool = 10;
Expand Down Expand Up @@ -329,8 +345,6 @@ public void testByteBuffUnsupportedMethods() {
ByteBuff buf = alloc.allocate(bufSize);
assertException(() -> buf.retain(2));
assertException(() -> buf.release(2));
assertException(() -> buf.touch());
assertException(() -> buf.touch(new Object()));
}

private void assertException(Runnable r) {
Expand Down
Loading

0 comments on commit f3f292f

Please sign in to comment.