Skip to content

Commit

Permalink
[FLINK-4545] [network] Small adjustments to LocalBufferPool with limi…
Browse files Browse the repository at this point in the history
…ted the number of used buffers
  • Loading branch information
StephanEwen committed Mar 10, 2017
1 parent 11e2aa6 commit 206ea21
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
Expand Up @@ -58,7 +58,7 @@ class LocalBufferPool implements BufferPool {
* The currently available memory segments. These are segments, which have been requested from
* the network buffer pool and are currently not handed out as Buffer instances.
*/
private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

/**
* Buffer availability listeners, which need to be notified when a Buffer becomes available.
Expand Down Expand Up @@ -297,8 +297,9 @@ public boolean addListener(EventListener<Buffer> listener) {
@Override
public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);

if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
Expand Down
Expand Up @@ -22,13 +22,13 @@
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;

Expand All @@ -50,7 +50,7 @@ public class NetworkBufferPool implements BufferPoolFactory {

private final int memorySegmentSize;

private final Queue<MemorySegment> availableMemorySegments;
private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;

private volatile boolean isDestroyed;

Expand Down Expand Up @@ -124,9 +124,10 @@ public MemorySegment requestMemorySegment() {
return availableMemorySegments.poll();
}

// This is not safe with regard to destroy calls, but it does not hurt, because destroy happens
// only once at clean up time (task manager shutdown).
public void recycle(MemorySegment segment) {
// Adds the segment back to the queue, which does not immediately free the memory
// however, since this happens when references to the global pool are also released,
// making the availableMemorySegments queue and its contained object reclaimable
availableMemorySegments.add(segment);
}

Expand Down Expand Up @@ -260,7 +261,7 @@ private void redistributeBuffers() throws IOException {
assert Thread.holdsLock(factoryLock);

// All buffers, which are not among the required ones
int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;

if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
Expand All @@ -278,7 +279,8 @@ private void redistributeBuffers() throws IOException {
* a ratio that we use to distribute the buffers.
*/

int totalCapacity = 0;
long totalCapacity = 0; // long to avoid int overflow

for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
Expand All @@ -290,9 +292,13 @@ private void redistributeBuffers() throws IOException {
return; // necessary to avoid div by zero when nothing to re-distribute
}

int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
// since one of the arguments of 'min(a,b)' is a positive int, this is actually
// guaranteed to be within the 'int' domain
// (we use a checked downCast to handle possible bugs more gracefully).
final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
Math.min(numAvailableMemorySegment, totalCapacity));

int totalPartsUsed = 0; // of totalCapacity
long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
Expand All @@ -307,7 +313,10 @@ private void redistributeBuffers() throws IOException {

// avoid remaining buffers by looking at the total capacity that should have been
// re-distributed up until here
int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
// the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
final int mySize = MathUtils.checkedDownCast(
memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);

numDistributedMemorySegment += mySize;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
Expand Down
Expand Up @@ -991,7 +991,7 @@ public void testTaskManagerServicesConfiguration() throws Exception {
config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);

TaskManagerServicesConfiguration tmConfig =
TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLoopbackAddress(), true);

assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
Expand Down

0 comments on commit 206ea21

Please sign in to comment.