Skip to content

Commit

Permalink
[FLINK-33668][runtime] Decouple the network memory and job topology o…
Browse files Browse the repository at this point in the history
…n input gate side
  • Loading branch information
jiangxin369 authored and reswqa committed Mar 8, 2024
1 parent 012b893 commit 2e25789
Show file tree
Hide file tree
Showing 33 changed files with 464 additions and 212 deletions.
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.runtime.io.network.metrics;

import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -28,27 +31,35 @@
*/
public class CreditBasedInputBuffersUsageGauge extends AbstractBuffersUsageGauge {

private final FloatingBuffersUsageGauge floatingBuffersUsageGauge;
private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;

public CreditBasedInputBuffersUsageGauge(
FloatingBuffersUsageGauge floatingBuffersUsageGauge,
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
SingleInputGate[] inputGates) {
public CreditBasedInputBuffersUsageGauge(SingleInputGate[] inputGates) {
super(checkNotNull(inputGates));
this.floatingBuffersUsageGauge = checkNotNull(floatingBuffersUsageGauge);
this.exclusiveBuffersUsageGauge = checkNotNull(exclusiveBuffersUsageGauge);
}

@Override
public int calculateUsedBuffers(SingleInputGate inputGate) {
return floatingBuffersUsageGauge.calculateUsedBuffers(inputGate)
+ exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate);
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
int numBuffers = bufferPool.bestEffortGetNumOfUsedBuffers();
for (InputChannel ic : inputGate.inputChannels()) {
if (ic instanceof RemoteInputChannel) {
RemoteInputChannel remoteInputChannel = (RemoteInputChannel) ic;
numBuffers -= remoteInputChannel.unsynchronizedGetFloatingBuffersAvailable();
numBuffers -=
remoteInputChannel.getNumExclusiveBuffers()
- remoteInputChannel.unsynchronizedGetExclusiveBuffersUsed();
}
}
return Math.max(0, numBuffers);
}
return 0;
}

@Override
public int calculateTotalBuffers(SingleInputGate inputGate) {
return floatingBuffersUsageGauge.calculateTotalBuffers(inputGate)
+ exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate);
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
return inputGate.getBufferPool().getNumBuffers();
}
return 0;
}
}
Expand Up @@ -47,7 +47,7 @@ public int calculateTotalBuffers(SingleInputGate inputGate) {
int totalExclusiveBuffers = 0;
for (InputChannel ic : inputGate.inputChannels()) {
if (ic instanceof RemoteInputChannel) {
totalExclusiveBuffers += ((RemoteInputChannel) ic).getInitialCredit();
totalExclusiveBuffers += ((RemoteInputChannel) ic).getNumExclusiveBuffers();
}
}
return totalExclusiveBuffers;
Expand Down
Expand Up @@ -18,43 +18,39 @@

package org.apache.flink.runtime.io.network.metrics;

import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Gauge metric measuring the floating buffers usage gauge for {@link SingleInputGate}s. */
public class FloatingBuffersUsageGauge extends AbstractBuffersUsageGauge {

public FloatingBuffersUsageGauge(SingleInputGate[] inputGates) {
private final CreditBasedInputBuffersUsageGauge totalBuffersUsageGauge;
private final ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge;

public FloatingBuffersUsageGauge(
SingleInputGate[] inputGates,
CreditBasedInputBuffersUsageGauge totalBuffersUsageGauge,
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge) {
super(checkNotNull(inputGates));

this.totalBuffersUsageGauge = checkNotNull(totalBuffersUsageGauge);
this.exclusiveBuffersUsageGauge = checkNotNull(exclusiveBuffersUsageGauge);
}

@Override
public int calculateUsedBuffers(SingleInputGate inputGate) {
int availableFloatingBuffers = 0;
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
int requestedFloatingBuffers = bufferPool.bestEffortGetNumOfUsedBuffers();
for (InputChannel ic : inputGate.inputChannels()) {
if (ic instanceof RemoteInputChannel) {
availableFloatingBuffers +=
((RemoteInputChannel) ic).unsynchronizedGetFloatingBuffersAvailable();
}
}
return Math.max(0, requestedFloatingBuffers - availableFloatingBuffers);
}
return 0;
return Math.max(
0,
totalBuffersUsageGauge.calculateUsedBuffers(inputGate)
- exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
}

@Override
public int calculateTotalBuffers(SingleInputGate inputGate) {
BufferPool bufferPool = inputGate.getBufferPool();
if (bufferPool != null) {
return inputGate.getBufferPool().getNumBuffers();
}
return 0;
return Math.max(
0,
totalBuffersUsageGauge.calculateTotalBuffers(inputGate)
- exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate));
}
}
Expand Up @@ -213,13 +213,13 @@ private static void registerInputMetrics(
buffersGroup.gauge(METRIC_INPUT_QUEUE_LENGTH, new InputBuffersGauge(inputGates));
buffersGroup.gauge(METRIC_INPUT_QUEUE_SIZE, new InputBuffersSizeGauge(inputGates));

FloatingBuffersUsageGauge floatingBuffersUsageGauge =
new FloatingBuffersUsageGauge(inputGates);
CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge =
new CreditBasedInputBuffersUsageGauge(inputGates);
ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge =
new ExclusiveBuffersUsageGauge(inputGates);
CreditBasedInputBuffersUsageGauge creditBasedInputBuffersUsageGauge =
new CreditBasedInputBuffersUsageGauge(
floatingBuffersUsageGauge, exclusiveBuffersUsageGauge, inputGates);
FloatingBuffersUsageGauge floatingBuffersUsageGauge =
new FloatingBuffersUsageGauge(
inputGates, creditBasedInputBuffersUsageGauge, exclusiveBuffersUsageGauge);
buffersGroup.gauge(METRIC_INPUT_EXCLUSIVE_BUFFERS_USAGE, exclusiveBuffersUsageGauge);
buffersGroup.gauge(METRIC_INPUT_FLOATING_BUFFERS_USAGE, floatingBuffersUsageGauge);
buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, creditBasedInputBuffersUsageGauge);
Expand Down
Expand Up @@ -129,7 +129,7 @@ public void requestSubpartition(
partitionId,
subpartitionIndexSet,
inputChannel.getInputChannelId(),
inputChannel.getInitialCredit());
inputChannel.getNumExclusiveBuffers());

final ChannelFutureListener listener =
future -> {
Expand Down

0 comments on commit 2e25789

Please sign in to comment.