Skip to content

Commit

Permalink
[FLINK-30473][network] Optimize the InputGate network memory manageme…
Browse files Browse the repository at this point in the history
…nt for TaskManager

This closes #21620
  • Loading branch information
TanYuxin-tyx authored and xintongsong committed Jan 17, 2023
1 parent afdf4a7 commit ae8de97
Show file tree
Hide file tree
Showing 18 changed files with 716 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<td>Integer</td>
<td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers due to back pressure, while subtask is performing an action that can not be interrupted in the middle, like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to request overdraft buffers, so that the subtask can finish such uninterruptible action, without blocking unaligned checkpoints for long period of time. Overdraft buffers are provided on best effort basis only if the system has some unused buffers available. Subtask that has used overdraft buffers won't be allowed to process any more records until the overdraft buffers are returned to the pool.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.read-buffer.required-per-gate.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of network read buffers that are required by an input gate. (An input gate is responsible for reading data from all subtasks of an upstream task.) The number of buffers needed by an input gate is dynamically calculated in runtime, depending on various factors (e.g., the parallelism of the upstream task). Among the calculated number of needed buffers, the part below this configured value is required, while the excess part, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads. If explicitly configured, the configured value should be at least 1.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
<td style="word-wrap: break-word;">120</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
<td>Integer</td>
<td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers due to back pressure, while subtask is performing an action that can not be interrupted in the middle, like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to request overdraft buffers, so that the subtask can finish such uninterruptible action, without blocking unaligned checkpoints for long period of time. Overdraft buffers are provided on best effort basis only if the system has some unused buffers available. Subtask that has used overdraft buffers won't be allowed to process any more records until the overdraft buffers are returned to the pool.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.read-buffer.required-per-gate.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of network read buffers that are required by an input gate. (An input gate is responsible for reading data from all subtasks of an upstream task.) The number of buffers needed by an input gate is dynamically calculated in runtime, depending on various factors (e.g., the parallelism of the upstream task). Among the calculated number of needed buffers, the part below this configured value is required, while the excess part, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads. If explicitly configured, the configured value should be at least 1.</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
<td style="word-wrap: break-word;">120</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,35 @@ public class NettyShuffleEnvironmentOptions {
+ " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be"
+ " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");

/**
* Maximum number of network buffers to use for each outgoing/incoming gate (result
* partition/input gate), which contains all exclusive network buffers for all subpartitions and
* all floating buffers for the gate. The exclusive network buffers for one channel is
* configured by {@link #NETWORK_BUFFERS_PER_CHANNEL} and the floating buffers for one gate is
* configured by {@link #NETWORK_EXTRA_BUFFERS_PER_GATE}.
*/
@Experimental
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE =
key("taskmanager.network.memory.read-buffer.required-per-gate.max")
.intType()
.noDefaultValue()
.withDescription(
"The maximum number of network read buffers that are required by an"
+ " input gate. (An input gate is responsible for reading data"
+ " from all subtasks of an upstream task.) The number of buffers"
+ " needed by an input gate is dynamically calculated in runtime,"
+ " depending on various factors (e.g., the parallelism of the"
+ " upstream task). Among the calculated number of needed buffers,"
+ " the part below this configured value is required, while the"
+ " excess part, if any, is optional. A task will fail if the"
+ " required buffers cannot be obtained in runtime. A task will"
+ " not fail due to not obtaining optional buffers, but may"
+ " suffer a performance reduction. If not explicitly configured,"
+ " the default value is Integer.MAX_VALUE for streaming workloads,"
+ " and 1000 for batch workloads. If explicitly configured, the"
+ " configured value should be at least 1.");

/**
* Minimum number of network buffers required per blocking result partition for sort-shuffle.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.partition.consumer;

/**
* The buffer specs of the {@link InputGate} include exclusive buffers per channel, required/total
* floating buffers and the target of total buffers.
*/
public class GateBuffersSpec {

private final int effectiveExclusiveBuffersPerChannel;

private final int requiredFloatingBuffers;

private final int totalFloatingBuffers;

private final int targetTotalBuffersPerGate;

GateBuffersSpec(
int effectiveExclusiveBuffersPerChannel,
int requiredFloatingBuffers,
int totalFloatingBuffers,
int targetTotalBuffersPerGate) {
this.effectiveExclusiveBuffersPerChannel = effectiveExclusiveBuffersPerChannel;
this.requiredFloatingBuffers = requiredFloatingBuffers;
this.totalFloatingBuffers = totalFloatingBuffers;
this.targetTotalBuffersPerGate = targetTotalBuffersPerGate;
}

int getRequiredFloatingBuffers() {
return requiredFloatingBuffers;
}

int getTotalFloatingBuffers() {
return totalFloatingBuffers;
}

int getEffectiveExclusiveBuffersPerChannel() {
return effectiveExclusiveBuffersPerChannel;
}

public int targetTotalBuffersPerGate() {
return targetTotalBuffersPerGate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;

import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;

/** Utils to manage the specs of the {@link InputGate}, for example, {@link GateBuffersSpec}. */
public class InputGateSpecUitls {

public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH = 1000;

public static final int DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM = Integer.MAX_VALUE;

public static GateBuffersSpec createGateBuffersSpec(
Optional<Integer> configuredMaxRequiredBuffersPerGate,
int configuredNetworkBuffersPerChannel,
int configuredFloatingNetworkBuffersPerGate,
ResultPartitionType partitionType,
int numInputChannels) {
int maxRequiredBuffersThresholdPerGate =
getEffectiveMaxRequiredBuffersPerGate(
partitionType, configuredMaxRequiredBuffersPerGate);
int targetRequiredBuffersPerGate =
getRequiredBuffersTargetPerGate(
numInputChannels, configuredNetworkBuffersPerChannel);
int targetTotalBuffersPerGate =
getTotalBuffersTargetPerGate(
numInputChannels,
configuredNetworkBuffersPerChannel,
configuredFloatingNetworkBuffersPerGate);
int requiredBuffersPerGate =
Math.min(maxRequiredBuffersThresholdPerGate, targetRequiredBuffersPerGate);

int effectiveExclusiveBuffersPerChannel =
getExclusiveBuffersPerChannel(
configuredNetworkBuffersPerChannel,
numInputChannels,
requiredBuffersPerGate);
int effectiveExclusiveBuffersPerGate =
getEffectiveExclusiveBuffersPerGate(
numInputChannels, effectiveExclusiveBuffersPerChannel);

int requiredFloatingBuffers = requiredBuffersPerGate - effectiveExclusiveBuffersPerGate;
int totalFloatingBuffers = targetTotalBuffersPerGate - effectiveExclusiveBuffersPerGate;

checkState(requiredFloatingBuffers > 0, "Must be positive.");
checkState(
requiredFloatingBuffers <= totalFloatingBuffers,
"Wrong number of floating buffers.");

return new GateBuffersSpec(
effectiveExclusiveBuffersPerChannel,
requiredFloatingBuffers,
totalFloatingBuffers,
targetTotalBuffersPerGate);
}

@VisibleForTesting
static int getEffectiveMaxRequiredBuffersPerGate(
ResultPartitionType partitionType,
Optional<Integer> configuredMaxRequiredBuffersPerGate) {
return configuredMaxRequiredBuffersPerGate.orElseGet(
() ->
partitionType.isPipelinedOrPipelinedBoundedResultPartition()
? DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_STREAM
: DEFAULT_MAX_REQUIRED_BUFFERS_PER_GATE_FOR_BATCH);
}

/**
* Since at least one floating buffer is required, the number of required buffers is reduced by
* 1, and then the average number of buffers per channel is calculated. Returning the minimum
* value to ensure that the number of required buffers per gate is not more than the given
* requiredBuffersPerGate.}.
*/
private static int getExclusiveBuffersPerChannel(
int configuredNetworkBuffersPerChannel,
int numInputChannels,
int requiredBuffersPerGate) {
checkArgument(numInputChannels > 0, "Must be positive.");
checkArgument(requiredBuffersPerGate >= 1, "Require at least 1 buffer per gate.");
return Math.min(
configuredNetworkBuffersPerChannel,
(requiredBuffersPerGate - 1) / numInputChannels);
}

private static int getRequiredBuffersTargetPerGate(
int numInputChannels, int configuredNetworkBuffersPerChannel) {
return numInputChannels * configuredNetworkBuffersPerChannel + 1;
}

private static int getTotalBuffersTargetPerGate(
int numInputChannels,
int configuredNetworkBuffersPerChannel,
int configuredFloatingBuffersPerGate) {
return numInputChannels * configuredNetworkBuffersPerChannel
+ configuredFloatingBuffersPerGate;
}

private static int getEffectiveExclusiveBuffersPerGate(
int numInputChannels, int effectiveExclusiveBuffersPerChannel) {
return effectiveExclusiveBuffersPerChannel * numInputChannels;
}
}

0 comments on commit ae8de97

Please sign in to comment.