Skip to content

Commit

Permalink
[FLINK-31764][runtime] Get rid of numberOfRequestedOverdraftMemorySeg…
Browse files Browse the repository at this point in the history
…ments in LocalBufferPool.
  • Loading branch information
reswqa committed May 6, 2023
1 parent f80d7a4 commit 3379c2c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
*
* <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
* will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match
* its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments +
* numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In
* order to meet this requirement, when the size of this pool changes,
* numberOfRequestedMemorySegments and numberOfRequestedOverdraftMemorySegments can be converted to
* each other.
* its new size.
*
* <p>New buffers can be requested only when {@code numberOfRequestedMemorySegments <
* currentPoolSize + maxOverdraftBuffersPerGate}. In other words, all buffers exceeding the
* currentPoolSize will be dynamically regarded as overdraft buffers.
*
* <p>Availability is defined as returning a non-overdraft segment on a subsequent {@link
* #requestBuffer()}/ {@link #requestBufferBuilder()} and heaving a non-blocking {@link
Expand Down Expand Up @@ -124,9 +124,6 @@ class LocalBufferPool implements BufferPool {

private int maxOverdraftBuffersPerGate;

@GuardedBy("availableMemorySegments")
private int numberOfRequestedOverdraftMemorySegments;

@GuardedBy("availableMemorySegments")
private boolean isDestroyed;

Expand Down Expand Up @@ -306,13 +303,6 @@ public int getNumberOfRequestedMemorySegments() {
}
}

@VisibleForTesting
public int getNumberOfRequestedOverdraftMemorySegments() {
synchronized (availableMemorySegments) {
return numberOfRequestedOverdraftMemorySegments;
}
}

@Override
public int getNumberOfAvailableMemorySegments() {
synchronized (availableMemorySegments) {
Expand All @@ -331,11 +321,7 @@ public int getNumBuffers() {
@SuppressWarnings("FieldAccessNotGuarded")
@Override
public int bestEffortGetNumOfUsedBuffers() {
return Math.max(
0,
numberOfRequestedMemorySegments
+ numberOfRequestedOverdraftMemorySegments
- availableMemorySegments.size());
return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
}

@Override
Expand Down Expand Up @@ -452,14 +438,9 @@ private boolean requestMemorySegmentFromGlobal() {
return false;
}

checkState(
!isDestroyed,
"Destroyed buffer pools should never acquire segments - this will lead to buffer leaks.");

MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
MemorySegment segment = requestPooledMemorySegment();
if (segment != null) {
availableMemorySegments.add(segment);
numberOfRequestedMemorySegments++;
return true;
}
return false;
Expand All @@ -469,17 +450,25 @@ private boolean requestMemorySegmentFromGlobal() {
private MemorySegment requestOverdraftMemorySegmentFromGlobal() {
assert Thread.holdsLock(availableMemorySegments);

if (numberOfRequestedOverdraftMemorySegments >= maxOverdraftBuffersPerGate) {
// if overdraft buffers(i.e. buffers exceeding poolSize) is greater than or equal to
// maxOverdraftBuffersPerGate, no new buffer can be requested.
if (numberOfRequestedMemorySegments - currentPoolSize >= maxOverdraftBuffersPerGate) {
return null;
}

return requestPooledMemorySegment();
}

@Nullable
@GuardedBy("availableMemorySegments")
private MemorySegment requestPooledMemorySegment() {
checkState(
!isDestroyed,
"Destroyed buffer pools should never acquire segments - this will lead to buffer leaks.");

MemorySegment segment = networkBufferPool.requestPooledMemorySegment();
if (segment != null) {
numberOfRequestedOverdraftMemorySegments++;
numberOfRequestedMemorySegments++;
}
return segment;
}
Expand Down Expand Up @@ -525,9 +514,7 @@ private void onGlobalPoolAvailable() {
private boolean shouldBeAvailable() {
assert Thread.holdsLock(availableMemorySegments);

return !availableMemorySegments.isEmpty()
&& unavailableSubpartitionsCount == 0
&& numberOfRequestedOverdraftMemorySegments == 0;
return !availableMemorySegments.isEmpty() && unavailableSubpartitionsCount == 0;
}

@GuardedBy("availableMemorySegments")
Expand Down Expand Up @@ -684,19 +671,6 @@ public void setNumBuffers(int numBuffers) {

currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);

// If pool size increases, try to convert overdraft buffer to ordinary buffer.
while (numberOfRequestedOverdraftMemorySegments > 0
&& numberOfRequestedMemorySegments < currentPoolSize) {
numberOfRequestedOverdraftMemorySegments--;
numberOfRequestedMemorySegments++;
}

// If pool size decreases, try to convert ordinary buffer to overdraft buffer.
while (numberOfRequestedMemorySegments > currentPoolSize) {
numberOfRequestedMemorySegments--;
numberOfRequestedOverdraftMemorySegments++;
}

returnExcessMemorySegments();

if (isDestroyed) {
Expand Down Expand Up @@ -760,12 +734,7 @@ private void mayNotifyAvailable(@Nullable CompletableFuture<?> toNotify) {
private void returnMemorySegment(MemorySegment segment) {
assert Thread.holdsLock(availableMemorySegments);

// When using the overdraft buffer, return the overdraft buffer first.
if (numberOfRequestedOverdraftMemorySegments > 0) {
numberOfRequestedOverdraftMemorySegments--;
} else {
numberOfRequestedMemorySegments--;
}
numberOfRequestedMemorySegments--;
networkBufferPool.recyclePooledMemorySegment(segment);
}

Expand All @@ -785,8 +754,7 @@ private void returnExcessMemorySegments() {

@GuardedBy("availableMemorySegments")
private boolean hasExcessBuffers() {
return numberOfRequestedOverdraftMemorySegments > 0
|| numberOfRequestedMemorySegments > currentPoolSize;
return numberOfRequestedMemorySegments > currentPoolSize;
}

@GuardedBy("availableMemorySegments")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ void testDecreasePoolSizeInternal(
// set a small pool size.
bufferPool.setNumBuffers(smallPoolSize);
assertThat(bufferPool.getNumBuffers()).isEqualTo(smallPoolSize);
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
assertThat(getNumberRequestedOverdraftBuffers(bufferPool))
.isEqualTo(numRequestedOverdraftBuffersAfterDecreasing);
assertThat(bufferPool.getNumberOfRequestedMemorySegments())
assertThat(getNumberRequestedOrdinaryBuffers(bufferPool))
.isEqualTo(numRequestedOrdinaryBuffersAfterDecreasing);
assertThat(bufferPool.getNumberOfAvailableMemorySegments())
.isEqualTo(numAvailableBuffersAfterDecreasing);
Expand Down Expand Up @@ -396,16 +396,15 @@ void testIncreasePoolSizeInternal(
buffers.add(bufferPool.requestMemorySegmentBlocking());
}
assertThat(bufferPool.requestMemorySegment()).isNull();
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
.isEqualTo(maxOverdraftBuffers);
assertThat(getNumberRequestedOverdraftBuffers(bufferPool)).isEqualTo(maxOverdraftBuffers);
assertThat(bufferPool.isAvailable()).isFalse();

// set a large pool size.
bufferPool.setNumBuffers(largePoolSize);
assertThat(bufferPool.getNumBuffers()).isEqualTo(largePoolSize);
assertThat(bufferPool.getNumberOfAvailableMemorySegments())
.isEqualTo(numAvailableBuffersAfterIncreasePoolSize);
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
assertThat(getNumberRequestedOverdraftBuffers(bufferPool))
.isEqualTo(numOverdraftBuffersAfterIncreasePoolSize);
assertThat(bufferPool.isAvailable()).isEqualTo(isAvailableAfterIncreasePoolSize);

Expand Down Expand Up @@ -864,7 +863,7 @@ private void assertRequestedBufferAndIsAvailable(
if (numberOfRequestedOverdraftBuffer > 0) {
checkArgument(!isAvailable);
}
assertThat(bufferPool.getNumberOfRequestedOverdraftMemorySegments())
assertThat(getNumberRequestedOverdraftBuffers(bufferPool))
.isEqualTo(numberOfRequestedOverdraftBuffer);

assertThat(bufferPool.bestEffortGetNumOfUsedBuffers()).isEqualTo(numberOfRequestedBuffer);
Expand All @@ -875,6 +874,16 @@ private void assertRequestedBufferAndIsAvailable(
// Helpers
// ------------------------------------------------------------------------

private static int getNumberRequestedOverdraftBuffers(LocalBufferPool bufferPool) {
return Math.max(
bufferPool.getNumberOfRequestedMemorySegments() - bufferPool.getNumBuffers(), 0);
}

private static int getNumberRequestedOrdinaryBuffers(LocalBufferPool bufferPool) {
return Math.min(
bufferPool.getNumBuffers(), bufferPool.getNumberOfRequestedMemorySegments());
}

private int getNumRequestedFromMemorySegmentPool() {
return networkBufferPool.getTotalNumberOfMemorySegments()
- networkBufferPool.getNumberOfAvailableMemorySegments();
Expand Down

0 comments on commit 3379c2c

Please sign in to comment.