Skip to content

Commit

Permalink
[FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyunhaii authored and XComp committed Nov 21, 2022
1 parent a5b9efe commit fca350a
Showing 1 changed file with 43 additions and 14 deletions.
Expand Up @@ -18,36 +18,60 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.junit.After;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** Tests for the {@link NettyBufferPool} wrapper. */
public class NettyBufferPoolTest {

private final List<ByteBuf> needReleasing = new ArrayList<>();

@After
public void tearDown() {
try {
// Release all of the buffers.
for (ByteBuf buf : needReleasing) {
buf.release();
}

// Checks in a separate loop in case we have sliced buffers.
for (ByteBuf buf : needReleasing) {
assertEquals(0, buf.refCnt());
}
} finally {
needReleasing.clear();
}
}

@Test
public void testNoHeapAllocations() throws Exception {
NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
final NettyBufferPool nettyBufferPool = new NettyBufferPool(1);

// Buffers should prefer to be direct
assertTrue(nettyBufferPool.buffer().isDirect());
assertTrue(nettyBufferPool.buffer(128).isDirect());
assertTrue(nettyBufferPool.buffer(128, 256).isDirect());
assertTrue(releaseLater(nettyBufferPool.buffer()).isDirect());
assertTrue(releaseLater(nettyBufferPool.buffer(128)).isDirect());
assertTrue(releaseLater(nettyBufferPool.buffer(128, 256)).isDirect());

// IO buffers should prefer to be direct
assertTrue(nettyBufferPool.ioBuffer().isDirect());
assertTrue(nettyBufferPool.ioBuffer(128).isDirect());
assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect());
assertTrue(releaseLater(nettyBufferPool.ioBuffer()).isDirect());
assertTrue(releaseLater(nettyBufferPool.ioBuffer(128)).isDirect());
assertTrue(releaseLater(nettyBufferPool.ioBuffer(128, 256)).isDirect());

// Currently we fakes the heap buffer allocation with direct buffers
assertTrue(nettyBufferPool.heapBuffer().isDirect());
assertTrue(nettyBufferPool.heapBuffer(128).isDirect());
assertTrue(nettyBufferPool.heapBuffer(128, 256).isDirect());
assertTrue(releaseLater(nettyBufferPool.heapBuffer()).isDirect());
assertTrue(releaseLater(nettyBufferPool.heapBuffer(128)).isDirect());
assertTrue(releaseLater(nettyBufferPool.heapBuffer(128, 256)).isDirect());

// Composite buffers allocates the corresponding type of buffers when extending its capacity
assertTrue(nettyBufferPool.compositeHeapBuffer().capacity(1024).isDirect());
assertTrue(nettyBufferPool.compositeHeapBuffer(10).capacity(1024).isDirect());
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer()).capacity(1024).isDirect());
assertTrue(releaseLater(nettyBufferPool.compositeHeapBuffer(10)).capacity(1024).isDirect());

// Is direct buffer pooled!
assertTrue(nettyBufferPool.isDirectBufferPooled());
Expand All @@ -60,16 +84,21 @@ public void testAllocationsStatistics() throws Exception {

{
// Single large buffer allocates one chunk
nettyBufferPool.directBuffer(chunkSize - 64);
releaseLater(nettyBufferPool.directBuffer(chunkSize - 64));
long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
assertEquals(chunkSize, allocated);
}

{
// Allocate a little more (one more chunk required)
nettyBufferPool.directBuffer(128);
releaseLater(nettyBufferPool.directBuffer(128));
long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get();
assertEquals(2 * chunkSize, allocated);
}
}

private ByteBuf releaseLater(ByteBuf buf) {
needReleasing.add(buf);
return buf;
}
}

0 comments on commit fca350a

Please sign in to comment.