Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 46 additions & 51 deletions docs/content.zh/docs/dev/datastream/dataset_migration.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: "How to Migrate from DataSet to DataStream"
title: "如何迁移 DataSet DataStream"
weight: 302
type: docs
---
Expand All @@ -22,34 +22,30 @@ specific language governing permissions and limitations
under the License.
-->

# How to Migrate from DataSet to DataStream
# 如何迁移 DataSet DataStream

The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the
Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their
data processing requirements.
DataSet API 已被正式弃用,并且将不再获得主动的维护和支持,它将在 Flink 2.0 版本被删除。
建议 Flink 用户从 DataSet API 迁移到 DataStream API、Table API 和 SQL 来满足数据处理需求。

Noticed that APIs in DataStream do not always match those in DataSet exactly. The purpose of this document is to help users understand
how to achieve the same data processing behaviors with DataStream APIs as using DataSet APIs.
请注意,DataStream 中的 API 并不总是与 DataSet 完全匹配。
本文档的目的是帮助用户理解如何使用 DataStream API 实现与使用 DataSet API 相同的数据处理行为。

According to the changes in coding and execution efficiency that are required for migration, we categorized DataSet APIs into 4 categories:
根据迁移过程中开发和执行效率的变化程度,我们将 DataSet API 分为四类:

- Category 1: APIs that have exact equivalent in DataStream, which requires barely any changes to migrate.
- 第一类:在 DataStream 中具有完全相同的 API,几乎不需要任何更改即可迁移;

- Category 2: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, which might require some code changes for
migration but will result in the same execution efficiency.
- 第二类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码,但仍保持相同的执行效率;

- Category 3: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency.
- 第三类:其行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本;

- Category 4: APIs whose behaviors are not supported by DataStream API.
- 第四类:其行为不被 DataStream API 支持。

The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate
each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each
category.
后续章节将首先介绍如何设置执行环境和 source/sink ,然后详细解释每种类别的 DataSet API 如何迁移到 DataStream API,强调与每个类别迁移过程中相关的考虑因素和面临的挑战。


## Setting the execution environment
## 设置执行环境

The first step of migrating an application from DataSet API to DataStream API is to replace `ExecutionEnvironment` with `StreamExecutionEnvironment`.
将应用程序从 DataSet API 迁移到 DataStream API 的第一步是将 `ExecutionEnvironment` 替换为 `StreamExecutionEnvironment`

<table class="table table-bordered">
<thead>
Expand All @@ -62,44 +58,45 @@ The first step of migrating an application from DataSet API to DataStream API is
<tr>
<td>
{{< highlight "java" >}}
// Create the execution environment
// 创建执行环境
ExecutionEnvironment.getExecutionEnvironment();
// Create the local execution environment
// 创建本地执行环境
ExecutionEnvironment.createLocalEnvironment();
// Create the collection environment
// 创建 collection 环境
new CollectionEnvironment();
// Create the remote environment
// 创建远程执行环境
ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
{{< /highlight >}}
</td>
<td>
{{< highlight "java" >}}
// Create the execution environment
// 创建执行环境
StreamExecutionEnvironment.getExecutionEnvironment();
// Create the local execution environment
// 创建本地执行环境
StreamExecutionEnvironment.createLocalEnvironment();
// The collection environment is not supported.
// Create the remote environment
// 不支持 collection 环境
// 创建远程执行环境
StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles);
{{< /highlight >}}
</td>
</tr>
</tbody>
</table>

Unlike DataSet, DataStream supports processing on both bounded and unbounded data streams. Thus, user needs to explicitly set the execution mode
to `RuntimeExecutionMode.BATCH` if that is expected.
与 DataSet 不同,DataStream 支持对有界和无界数据流进行处理。

如果需要的话,用户可以显式地将执行模式设置为 `RuntimeExecutionMode.BATCH`。

```java
StreamExecutionEnvironment executionEnvironment = // [...];
executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
```

## Using the streaming sources and sinks
## 设置 streaming 类型的 Source 和 Sink

### Sources

The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`.
DataStream API 使用 `DataStreamSource` 从外部系统读取记录,而 DataSet API 使用 `DataSource`

<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -136,8 +133,7 @@ DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat)

### Sinks

The DataStream API uses `DataStreamSink` to write records to external system, while the
DataSet API uses the `DataSink`.
DataStream API 使用 `DataStreamSink` 将记录写入外部系统,而 DataSet API 使用 `DataSink`。

<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -172,13 +168,13 @@ DataStreamSink<> sink = dataStream.writeAsText(path);
</tbody>
</table>

If you are looking for pre-defined source and sink connectors of DataStream, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}})
如果您正在寻找 DataStream 预定义的连接器,请查看[连接器]({{< ref "docs/connectors/datastream/overview" >}})

## Migrating DataSet APIs
## 迁移 DataSet APIs

### Category 1
### 第一类

For Category 1, these DataSet APIs have exact equivalent in DataStream, which requires barely any changes to migrate.
对于第一类,这些 DataSet API 在 DataStream 中具有完全相同的功能,几乎不需要任何更改即可迁移。

<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -336,13 +332,12 @@ dataStream.keyBy(value -> value.f0)
</tbody>
</table>

### Category 2
### 第二类

For category 2, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, which might require some code changes for
migration but will result in the same execution efficiency.
对于第二类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,这可能需要更改一些代码来进行迁移,但仍保持相同的执行效率。

Operations on a full DataSet correspond to the global window aggregation in DataStream with a custom window that is triggered at the end of the inputs. The `EndOfStreamWindows`
in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#endofstreamwindows" >}}) shows how such a window can be implemented. We will reuse it in the rest of this document.
DataSet 中存在对整个 DataSet 进行操作的 API。这些 API 在 DataStream 中可以用一个全局窗口来实现,该全局窗口只会在输入数据结束时触发窗口内数据的计算。
[附录]({{< ref "docs/dev/datastream/dataset_migration#endofstreamwindows" >}})中的 `EndOfStreamWindows` 显示了如何实现这样的窗口,我们将在本文档的其余部分重复使用它。

<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -601,13 +596,12 @@ dataSet1.rightOuterJoin(dataSet2)
</tbody>
</table>

### Category 3
### 第三类

For category 3, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency.
对于第三类,这些 DataSet API 的行为可以通过 DataStream 中具有不同语义的其他 API 来实现,但可能会增加额外的执行效率成本。

Currently, DataStream API does not directly support aggregations on non-keyed streams (subtask-scope aggregations). In order to do so, we need to first assign the subtask id
to the records, then turn the stream into a keyed stream. The `AddSubtaskIdMapFunction` in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#addsubtaskidmapfunction" >}}) shows how
to do that, and we will reuse it in the rest of this document.
目前,DataStream API 不直接支持 non-keyed 流上的聚合(对 subtask 内的数据进行聚合)。为此,我们需要首先将 subtask ID 分配给记录,然后将流转换为 keyed 流。
[附录]({{< ref "docs/dev/datastream/dataset_migration#addsubtaskidmapfunction" >}})中的 `AddSubtaskIdMapFunction` 显示了如何执行此操作,我们将在本文档的其余部分中重复使用它。

<table class="table table-bordered">
<thead>
Expand Down Expand Up @@ -679,19 +673,19 @@ dataStream3.join(dataStream4)
</tbody>
</table>

### Category 4
### 第四类

The behaviors of the following DataSet APIs are not supported by DataStream.
以下 DataSet API 的行为不被 DataStream 支持。

* RangePartition
* GroupCombine


## Appendix
## 附录

#### EndOfStreamWindows

The following code shows the example of `EndOfStreamWindows`.
以下代码展示了 `EndOfStreamWindows` 示例实现。

```java
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
Expand Down Expand Up @@ -761,7 +755,8 @@ public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

#### AddSubtaskIDMapFunction

The following code shows the example of `AddSubtaskIDMapFunction`.
以下代码展示了 `AddSubtaskIDMapFunction` 示例实现。

```java
public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
@Override
Expand Down