Skip to content

Commit

Permalink
[FLINK-5002] Renamed getNumberOfUsedBuffers() method to bestEffortGet…
Browse files Browse the repository at this point in the history
…NumOfUsedBuffers(), add a test to check that it does not return a negative value.
  • Loading branch information
MayerRoman committed Nov 18, 2016
1 parent 5d5637b commit cacedd5
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
/**
* Returns the number of used buffers of this buffer pool.
*/
int getNumberOfUsedBuffers();
int bestEffortGetNumOfUsedBuffers();

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public int getNumBuffers() {
}

@Override
public int getNumberOfUsedBuffers() {
public int bestEffortGetNumOfUsedBuffers() {
return numberOfRequestedMemorySegments - availableMemorySegments.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public Float getValue() {
int bufferPoolSize = 0;

for (SingleInputGate inputGate : task.getAllInputGates()) {
usedBuffers += inputGate.getBufferPool().getNumberOfUsedBuffers();
usedBuffers += inputGate.getBufferPool().bestEffortGetNumOfUsedBuffers();
bufferPoolSize += inputGate.getBufferPool().getNumBuffers();
}

Expand Down Expand Up @@ -211,7 +211,7 @@ public Float getValue() {
int bufferPoolSize = 0;

for (ResultPartition resultPartition : task.getProducedPartitions()) {
usedBuffers += resultPartition.getBufferPool().getNumberOfUsedBuffers();
usedBuffers += resultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers();
bufferPoolSize += resultPartition.getBufferPool().getNumBuffers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -328,6 +329,34 @@ public List<Buffer> call() throws Exception {
}
}

@Test
public void testBestEffortGetNumOfUsedBuffersReturnPositive() throws Exception {
int numBuffers = 10;
localBufferPool.setNumBuffers(numBuffers);

AtomicInteger errorCounter = new AtomicInteger(0);

Thread methodRequester = new Thread(new BestEffortGetNumOfUsedBuffersRequesterTask(localBufferPool, errorCounter));
executor.submit(methodRequester);

List<Buffer> buffers = Lists.newArrayList();
for (int i = 0; i < numBuffers; i++) {
buffers.add(localBufferPool.requestBuffer());
}

localBufferPool.setNumBuffers(numBuffers / 2);

for (Buffer buffer : buffers) {
buffer.recycle();
}

localBufferPool.lazyDestroy();

methodRequester.interrupt();

assertEquals("bestEffortGetNumOfUsedBuffers() returned a negative value", 0, errorCounter.get());
}

// ------------------------------------------------------------------------
// Helpers
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -362,4 +391,27 @@ public Boolean call() throws Exception {
return true;
}
}

private static class BestEffortGetNumOfUsedBuffersRequesterTask implements Runnable {

private final AtomicInteger errorCounter;

private final BufferPool localBufferPool;

private BestEffortGetNumOfUsedBuffersRequesterTask(BufferPool localBufferPool, AtomicInteger errorCounter) {

this.localBufferPool = localBufferPool;
this.errorCounter = errorCounter;
}

@Override
public void run() {

while (!Thread.interrupted()) {
if(localBufferPool.bestEffortGetNumOfUsedBuffers() < 0){
errorCounter.incrementAndGet();
}
}
}
}
}

0 comments on commit cacedd5

Please sign in to comment.