Skip to content

Commit

Permalink
[FLINK-30712][docs][network] Update documents for taskmanager memory …
Browse files Browse the repository at this point in the history
…configurations and tuning

This closes #21843
  • Loading branch information
TanYuxin-tyx authored and xintongsong committed Feb 10, 2023
1 parent 62f98be commit ae89c99
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 77 deletions.
18 changes: 10 additions & 8 deletions docs/content.zh/docs/deployment/memory/network_mem_tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,23 @@ The actual value of parallelism from which the problem occurs is various from jo
## 网络缓冲生命周期

Flink 有多个本地缓冲区池 —— 每个输出和输入流对应一个。
每个缓冲区池的大小被限制为
每个缓冲区池的目标缓冲区数由下面的公式计算得到。

`#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate`

缓冲区的大小可以通过 `taskmanager.memory.segment-size` 来设置。
每个缓冲区(Buffer)的大小可以通过 `taskmanager.memory.segment-size` 来设置。

### 输入网络缓冲

输入通道中的缓冲区被分为独占缓冲区(exclusive buffer)和流动缓冲区(floating buffer)。每个独占缓冲区只能被一个特定的通道使用。
一个通道可以从输入流的共享缓冲区池中申请额外的流动缓冲区。剩余的流动缓冲区是可选的并且只有资源足够的时候才能获取。
缓冲区池不一定总能达到目标缓冲区数。有一个阈值控制 Flink 在无法获取到缓冲区时是否会失败。
目标缓冲区数中,小于阈值的的部分被称为必须(Required)缓冲区,剩余的部分(如果有的话)是可选(Optional)缓冲区。
如果无法获得必须缓冲区,会导致任务失败。
如果无法获得可选缓冲区,任务不会失败,但可能会降低性能。

在初始阶段:
- Flink 会为每一个输入通道获取配置数量的独占缓冲区
- 所有的独占缓冲区都必须被满足,否则作业会抛异常失败
- Flink 至少要有一个流动缓冲区才能运行
对于流作业,这个阈值的默认值是Integer.MAX_VALUE,对于批作业,默认值是1000。
我们不建议用户更改这个阈值,除非用户有充分的理由修改它,并非常明确修改这个阈值带来的影响
这个阈值的配置选项是`taskmanager.network.memory.read-buffer.required-per-gate.max`
通常,阈值越小,出现“网络缓冲区数量不足”异常的可能性越小,但可能导致作业静默地性能下降,反之亦然

### 输出网络缓冲

Expand Down
19 changes: 12 additions & 7 deletions docs/content/docs/deployment/memory/network_mem_tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,25 @@ The actual value of parallelism from which the problem occurs is various from jo
## Network buffer lifecycle

Flink has several local buffer pools - one for the output stream and one for each input gate.
Each of those pools is limited to at most
The target size of each buffer pool is calculated by the following formula.

`#channels * taskmanager.network.memory.buffers-per-channel + taskmanager.network.memory.floating-buffers-per-gate`

The size of the buffer can be configured by setting `taskmanager.memory.segment-size`.

### Input network buffers

Buffers in the input channel are divided into exclusive and floating buffers. Exclusive buffers can be used by only one particular channel. A channel can request additional floating buffers from a buffer pool shared across all channels belonging to the given input gate. The remaining floating buffers are optional and are acquired only if there are enough resources available.

In the initialization phase:
- Flink will try to acquire the configured amount of exclusive buffers for each channel
- all exclusive buffers must be fulfilled or the job will fail with an exception
- a single floating buffer has to be allocated for Flink to be able to make progress
The target buffer pool size is not always reached.
There's a threshold controlling whether Flink should fail upon not obtaining buffers.
The part of the target number of buffers that below this threshold is considered required.
The remaining, if any, is optional.
Not obtaining required buffers will lead to task failures.
A task will not fail if it cannot obtain optional buffers, but may suffer a performance reduction.

The default value for this threshold is `Integer.MAX_VALUE` for streaming workloads, and `1000` for batch workloads.
We do not recommend users to change this threshold, unless the user has good reasons and knows what he/she is doing well.
The relevant configuration option is `taskmanager.network.memory.read-buffer.required-per-gate.max`.
In general, a smaller threshold leads to less chance of the "insufficient number of network buffers" exception, while the workloads may suffer performance reduction silently, and vice versa.

### Output network buffers

Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/ops/batch/batch_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl" >}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits, but some resource frameworks like Yarn will track this memory usage and kill the container if memory exceeds some threshold.
{{< /hint >}}

`Hash Shuffle` works well for small scale jobs with SSD, but it also have some disadvantages:
`Hash Shuffle` works well for small scale jobs with SSD, but it also has some disadvantages:

1. If the job scale is large, it might create too many files, and it requires a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it might incur the issue of random IO.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. The minimum valid value that can be configured is 0. When 0 buffers-per-channel is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words we ensure that, for performance reasons, there is at least one buffer per outgoing channel regardless of the configuration.</td>
<td>Number of exclusive network buffers for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. For the outgoing channel(subpartition), this value is the effective exclusive buffers per channel. For the incoming channel(input channel), this value is the max number of exclusive buffers per channel, the number of effective exclusive network buffers per channel is dynamically calculated from taskmanager.network.memory.read-buffer.required-per-gate.max and the effective range is from 0 to the configured value. The minimum valid value for the option is 0. When the option is configured as 0, the exclusive network buffers used by downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words, we ensure that, for performance reasons, at least one buffer is used per outgoing channel regardless of the configuration.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
<td style="word-wrap: break-word;">8</td>
<td>Integer</td>
<td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.</td>
<td>Number of floating network buffers for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the channels. The floating buffers can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. For the outgoing gate(result partition), this value is the effective floating buffers per gate. For the incoming gate(input gate), this value is a recommended number of floating buffers, the number of effective floating network buffers per gate is dynamically calculated from taskmanager.network.memory.read-buffer.required-per-gate.max and the range of effective floating buffers is from 0 to (parallelism - 1).</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.max-buffers-per-channel</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@
<td>Duration</td>
<td>Only relevant if <code class="highlighter-rouge">execution.checkpointing.unaligned.enabled</code> is enabled.<br /><br />If timeout is 0, checkpoints will always start unaligned.<br /><br />If timeout has a positive value, checkpoints will start aligned. If during checkpointing, checkpoint start delay exceeds this timeout, alignment will timeout and checkpoint barrier will start working as unaligned checkpoint.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.unaligned.max-subtasks-per-channel-state-file</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>Defines the maximum number of subtasks that share the same channel state file. It can reduce the number of small files when enable unaligned checkpoint. Each subtask will create a new channel state file when this is configured to 1.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.checkpoints-after-tasks-finish.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down Expand Up @@ -86,5 +80,11 @@
<td>Boolean</td>
<td>Forces unaligned checkpoints, particularly allowing them for iterative jobs.</td>
</tr>
<tr>
<td><h5>execution.checkpointing.unaligned.max-subtasks-per-channel-state-file</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>Defines the maximum number of subtasks that share the same channel state file. It can reduce the number of small files when enable unaligned checkpoint. Each subtask will create a new channel state file when this is configured to 1.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Number of exclusive network buffers to use for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization. The minimum valid value that can be configured is 0. When 0 buffers-per-channel is configured, the exclusive network buffers used per downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words we ensure that, for performance reasons, there is at least one buffer per outgoing channel regardless of the configuration.</td>
<td>Number of exclusive network buffers for each outgoing/incoming channel (subpartition/input channel) in the credit-based flow control model. For the outgoing channel(subpartition), this value is the effective exclusive buffers per channel. For the incoming channel(input channel), this value is the max number of exclusive buffers per channel, the number of effective exclusive network buffers per channel is dynamically calculated from taskmanager.network.memory.read-buffer.required-per-gate.max and the effective range is from 0 to the configured value. The minimum valid value for the option is 0. When the option is configured as 0, the exclusive network buffers used by downstream incoming channel will be 0, but for each upstream outgoing channel, max(1, configured value) will be used. In other words, we ensure that, for performance reasons, at least one buffer is used per outgoing channel regardless of the configuration.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
<td style="word-wrap: break-word;">8</td>
<td>Integer</td>
<td>Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.</td>
<td>Number of floating network buffers for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the channels. The floating buffers can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. For the outgoing gate(result partition), this value is the effective floating buffers per gate. For the incoming gate(input gate), this value is a recommended number of floating buffers, the number of effective floating network buffers per gate is dynamically calculated from taskmanager.network.memory.read-buffer.required-per-gate.max and the range of effective floating buffers is from 0 to (parallelism - 1).</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.max-buffers-per-channel</h5></td>
Expand Down

0 comments on commit ae89c99

Please sign in to comment.