Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/content.zh/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
- 根据数据量自动推导并行度可以更好地适应每天变化的数据量
- SQL作业中的算子也可以分配不同的并行度

当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 。
当前 Adaptive Batch Scheduler 是 Flink 默认的批作业调度器,无需额外配置。除非用户显式的配置了使用其他调度器,例如 `jobmanager.scheduler: default`。需要注意的是,由于 ["只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL_EXCHANGES_BLOCKING`(默认值) 或 `ALL_EXCHANGES_HYBRID_FULL` 或 `ALL_EXCHANGES_HYBRID_SELECTIVE`

### 自动推导并发度

Expand Down Expand Up @@ -185,7 +185,7 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调

### 局限性
- **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 的作业。
- **只支持所有数据交换都为 BLOCKING 或 HYBRID 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL_EXCHANGES_BLOCKING 或 ALL_EXCHANGES_HYBRID_FULL 或 ALL_EXCHANGES_HYBRID_SELECTIVE 的作业。
- **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` 和 `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
- **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 自动推导并行度时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)。

Expand Down
18 changes: 16 additions & 2 deletions docs/content.zh/docs/ops/batch/batch_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,27 @@ Hybrid shuffle provides two spilling strategies:

### Usage

To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).

#### Data Consumption Constraints

Hybrid shuffle divides the partition data consumption constraints between producer and consumer into the following three cases:

- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when all producers are finished.
- **ONLY_FINISHED_PRODUCERS** : hybrid partition data can be consumed when its producer is finished.
- **UNFINISHED_PRODUCERS** : hybrid partition data can be consumed even if its producer is un-finished.

These could be configured via [jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref "docs/deployment/config" >}}#jobmanager-partition-hybrid-partition-data-consume-constraint).

- **For `AdaptiveBatchScheduler`** : The default constraint is `UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be degraded.
- **If `SpeculativeExecution` is enabled** : The default constraint is `ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with blocking shuffle. Since producers and consumers have the opportunity to run at the same time, more speculative execution tasks may be created, and the cost of failover will also increase. If you want to fall back to the same behavior as blocking shuffle, you can configure this value to `ALL_PRODUCERS_FINISHED`. It is also important to note that `UNFINISHED_PRODUCERS` is not supported in this mode.

### Limitations

Hybrid shuffle mode is still experimental and has some known limitations, which the Flink community is still working on eliminating.

- **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently forces each task to be executed in a dedicated slot exclusively. If slot sharing is explicitly specified, an error will occur.
- **No support for Adaptive Batch Scheduler and Speculative Execution.** If adaptive batch scheduler is used in hybrid shuffle mode, an error will occur.
- **No optimization for dynamic graph.** If auto-parallelism(dynamic graph) is enabled for `AdaptiveBatchScheduler`, hybrid shuffle will always schedule tasks only when all producer are finished like blocking shuffle, this means that the constraint will fall back to `ALL_PRODUCERS_FINISHED` in this case.

## 性能调优

Expand All @@ -140,6 +153,7 @@ Hybrid shuffle mode is still experimental and has some known limitations, which
1. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将 [网络内存比例]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少 0.2。为了使调整生效,你可能需要同时调整 [网络内存大小下界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) 以及 [网络内存大小上界]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个 [内存配置文档]({{< ref "docs/deployment/memory/mem_setup_tm" >}})。
2. 增大数据写出内存。对于大规模作业, 建议增大总内存大小,用于数据写入的内存越大, 下游越有机会直接从内存读取数据. 你需要保证每个 `Result Partition` 至少能够分配到 `numSubpartition + 1` 个buffer, 否则可能会遇到 "Insufficient number of network buffers" 错误。
3. 增大数据读取内存。对于大规模作业,建议增大 [数据读取内存]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 到一个较大的值 (比如,256M 或 512M)。因为这个内存是从框架的堆外内存切分出来的,因此你必须增加相同的内存大小到 [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) 以避免出现直接内存溢出错误。
4. 当使用 `Hybrid Shuffle` 时, 减少 [独占网络缓冲区]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 可能会严重影响性能. 因此,最好不要将改值设置为 `0`, 并且对于大规模作业可以适当增加该值. 同样需要注意的是: hybrid shuffle 默认会将 [taskmanager.network.memory.read-buffer.required-per-gate.max]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-read-buffer-required-per-gate-max) 设置为 `Integer.MAX_VALUE`. 最好不要去调整该配置,否则可能会造成性能的下降.
{{< /tab >}}

{{< /tabs >}}
Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ The Adaptive Batch Scheduler is a batch job scheduler that can automatically adj
- Operators from SQL batch jobs can be assigned with different parallelisms which are automatically tuned

At present, the Adaptive Batch Scheduler is the default scheduler for Flink batch jobs. No additional configuration is required unless other schedulers are explicitly configured, e.g. `jobmanager.scheduler: default`. Note that you need to
leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL_EXCHANGES_BLOCKING` (default value) due to ["ALL_EXCHANGES_BLOCKING jobs only"](#limitations-2).
leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL_EXCHANGES_BLOCKING` (default value) or `ALL_EXCHANGES_HYBRID_FULL` or `ALL_EXCHANGES_HYBRID_SELECTIVE` due to ["BLOCKING or HYBRID jobs only"](#limitations-2).

### Automatically decide parallelisms for operators

Expand Down Expand Up @@ -190,7 +190,7 @@ In addition, there are several related configuration options that may need adjus
### Limitations

- **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
- **ALL_EXCHANGES_BLOCKING jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING`.
- **BLOCKING or HYBRID jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL_EXCHANGES_BLOCKING / ALL_EXCHANGES_HYBRID_FULL / ALL_EXCHANGES_HYBRID_SELECTIVE`.
- **FileInputFormat sources are not supported**: FileInputFormat sources are not supported, including `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` and `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should use the new sources([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files when using the Adaptive Batch Scheduler.
- **Inconsistent broadcast results metrics on WebUI**: When use Adaptive Batch Scheduler to automatically decide parallelisms for operators, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details.

Expand Down
18 changes: 16 additions & 2 deletions docs/content/docs/ops/batch/batch_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,27 @@ Hybrid shuffle provides two spilling strategies:

### Usage

To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution.batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).
To use hybrid shuffle mode, you need to configure the [execution.batch-shuffle-mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) to `ALL_EXCHANGES_HYBRID_FULL` (full spilling strategy) or `ALL_EXCHANGES_HYBRID_SELECTIVE` (selective spilling strategy).

#### Data Consumption Constraints

Hybrid shuffle divides the partition data consumption constraints between producer and consumer into the following three cases:

- **ALL_PRODUCERS_FINISHED** : hybrid partition data can be consumed only when all producers are finished.
- **ONLY_FINISHED_PRODUCERS** : hybrid partition can only consume data from finished producers.
- **UNFINISHED_PRODUCERS** : hybrid partition can consume data from unfinished producers.

These could be configured via [jobmanager.partition.hybrid.partition-data-consume-constraint]({{< ref "docs/deployment/config" >}}#jobmanager-partition-hybrid-partition-data-consume-constraint).

- **For `AdaptiveBatchScheduler`** : The default constraint is `UNFINISHED_PRODUCERS` to perform pipelined-like shuffle. If the value is set to `ALL_PRODUCERS_FINISHED` or `ONLY_FINISHED_PRODUCERS`, performance may be degraded.
- **If `SpeculativeExecution` is enabled** : The default constraint is `ONLY_FINISHED_PRODUCERS` to bring some performance optimization compared with blocking shuffle. Since producers and consumers have the opportunity to run at the same time, more speculative execution tasks may be created, and the cost of failover will also increase. If you want to fall back to the same behavior as blocking shuffle, you can configure this value to `ALL_PRODUCERS_FINISHED`. It is also important to note that `UNFINISHED_PRODUCERS` is not supported in this mode.

### Limitations

Hybrid shuffle mode is still experimental and has some known limitations, which the Flink community is still working on eliminating.

- **No support for Slot Sharing.** In hybrid shuffle mode, Flink currently forces each task to be executed in a dedicated slot exclusively. If slot sharing is explicitly specified, an error will occur.
- **No support for Adaptive Batch Scheduler and Speculative Execution.** If adaptive batch scheduler is used in hybrid shuffle mode, an error will occur.
- **No pipelined execution for dynamic graph.** If auto-parallelism (dynamic graph) is enabled, Adaptive Batch Scheduler will wait until upstream tasks finish to decide parallelism of downstream tasks, which means hybrid shuffle effectively fallback to blocking shuffle (`ALL_PRODUCERS_FINISHED` constraint).

## Performance Tuning

Expand All @@ -140,6 +153,7 @@ The following guidelines may help you to achieve better performance especially f
1. Increase the total size of network memory. Currently, the default network memory size is pretty modest. For large scale jobs, it's suggested to increase the total [network memory fraction]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better performance. At the same time, you may also need to adjust the [lower bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and [upper bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-max) of the network memory size, please refer to the [memory configuration document]({{< ref "docs/deployment/memory/mem_setup_tm" >}}) for more information.
2. Increase the memory size for shuffle data write. For large scale jobs, it's suggested to increase the total size of network memory, the larger the memory that can be used in the shuffle write phase, the more opportunities downstream to read data directly from memory. You need to ensure that each `Result Partition` can be allocated to at least `numSubpartition + 1` buffers, otherwise the "Insufficient number of network buffers" will be encountered.
3. Increase the memory size for shuffle data read. For large scale jobs, it's suggested to increase the size of the [shared read memory]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value (for example, 256M or 512M). Because this memory is cut from the framework off-heap memory, you must increase [taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) by the same size to avoid the direct memory OOM error.
4. When `Hybrid Shuffle` is used, decreasing the number of [exclusive buffers per channel]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) will seriously affect the performance. Therefore, this value should not be set to `0`, and for large-scale job, this can be appropriately increased. It should be also noted that, for hybrid shuffle, [taskmanager.network.memory.read-buffer.required-per-gate.max]({{< ref "docs/deployment/config" >}}#taskmanager-network-memory-read-buffer-required-per-gate-max) has been set to `Integer.MAX_VALUE` by default. It is better not to adjust this value, otherwise there is a risk of performance degradation.
{{< /tab >}}

{{< /tabs >}}
Expand Down