diff --git a/docs/content.zh/docs/dev/datastream/dataset_migration.md b/docs/content.zh/docs/dev/datastream/dataset_migration.md new file mode 100644 index 0000000000000..25d1cb8f9a26b --- /dev/null +++ b/docs/content.zh/docs/dev/datastream/dataset_migration.md @@ -0,0 +1,774 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +Noticed that APIs in DataStream do not always match those in DataSet exactly. The purpose of this document is to help users understand +how to achieve the same data processing behaviors with DataStream APIs as using DataSet APIs. + +According to the changes in coding and execution efficiency that are required for migration, we categorized DataSet APIs into 4 categories: + +- Category 1: APIs that have exact equivalent in DataStream, which requires barely any changes to migrate. + +- Category 2: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, which might require some code changes for +migration but will result in the same execution efficiency. + +- Category 3: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency. + +- Category 4: APIs whose behaviors are not supported by DataStream API. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +The first step of migrating an application from DataSet API to DataStream API is to replace `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + + + + + + + + + + + +
DataSetDataStream
+ {{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); + {{< /highlight >}} + + {{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); + {{< /highlight >}} +
+ +Unlike DataSet, DataStream supports processing on both bounded and unbounded data streams. Thus, user needs to explicitly set the execution mode +to `RuntimeExecutionMode.BATCH` if that is expected. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + + + + + + + + + + + +
DataSetDataStream
+ {{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) + {{< /highlight >}} + + {{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) + {{< /highlight >}} +
+ +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + + + + + + + + + + + +
DataSetDataStream
+ {{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); + {{< /highlight >}} + + {{< highlight "java" >}} +// Write to sink +DataStreamSink<> sink = dataStream.sinkTo(sink) +// Write to csv file +DataStreamSink<> sink = dataStream.writeAsCsv(path); +// Write to text file +DataStreamSink<> sink = dataStream.writeAsText(path); + {{< /highlight >}} +
+ +If you are looking for pre-defined source and sink connectors of DataStream, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}) + +## Migrating DataSet APIs + +### Category 1 + +For Category 1, these DataSet APIs have exact equivalent in DataStream, which requires barely any changes to migrate. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OperationsDataSetDataStream
Map + {{< highlight "java" >}} +dataSet.map(new MapFunction<>(){ +// implement user-defined map logic +}); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.map(new MapFunction<>(){ +// implement user-defined map logic +}); + {{< /highlight >}} +
FlatMap + {{< highlight "java" >}} +dataSet.flatMap(new FlatMapFunction<>(){ +// implement user-defined flatmap logic +}); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.flatMap(new FlatMapFunction<>(){ +// implement user-defined flatmap logic +}); + {{< /highlight >}} +
Filter + {{< highlight "java" >}} +dataSet.filter(new FilterFunction<>(){ +// implement user-defined filter logic +}); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.filter(new FilterFunction<>(){ +// implement user-defined filter logic +}); + {{< /highlight >}} +
Union + {{< highlight "java" >}} +dataSet1.union(dataSet2); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream1.union(dataStream2); + {{< /highlight >}} +
Rebalance + {{< highlight "java" >}} +dataSet.rebalance(); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.rebalance(); + {{< /highlight >}} +
Project + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.project(2,0); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +dataStream.project(2,0); + {{< /highlight >}} +
Reduce on Grouped DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} +
Aggregate on Grouped DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(MAX, 1); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0) + .sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0) + .min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0) + .max(1); + {{< /highlight >}} +
+ +### Category 2 + +For category 2, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, which might require some code changes for +migration but will result in the same execution efficiency. + +Operations on a full DataSet correspond to the global window aggregation in DataStream with a custom window that is triggered at the end of the inputs. The `EndOfStreamWindows` +in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#endofstreamwindows" >}}) shows how such a window can be implemented. We will reuse it in the rest of this document. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OperationsDataSetDataStream
Distinct + {{< highlight "java" >}} +DataSet dataSet = // [...] +dataSet.distinct(); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +dataStream.keyBy(value -> value) + .reduce((value1, value2) -> value1); + {{< /highlight >}} +
Hash-Partition + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.partitionByHash(value -> value.f0); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +// partition by the hashcode of key +dataStream.partitionCustom( + (key, numSubpartition) -> key.hashCode() % numSubpartition, + value -> value.f0); + {{< /highlight >}} +
Reduce on Full DataSet + {{< highlight "java" >}} +DataSet dataSet = // [...] +dataSet.reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +dataStream.windowAll(EndOfStreamWindows.get()) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} +
Aggregate on Full DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +// compute sum of the second field +dataSet.aggregate(SUM, 1); +// compute min of the second field +dataSet.aggregate(MIN, 1); +// compute max of the second field +dataSet.aggregate(MAX, 1); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +// compute sum of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .sum(1); +// compute min of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .min(1); +// compute max of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .max(1); + {{< /highlight >}} +
GroupReduce on Full DataSet + {{< highlight "java" >}} +DataSet dataSet = // [...] +dataSet.reduceGroup(new GroupReduceFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +dataStream.windowAll(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} +
GroupReduce on Grouped DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduceGroup(new GroupReduceFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} +
First-n + {{< highlight "java" >}} +dataSet.first(n) + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.windowAll(EndOfStreamWindows.get()) + .apply(new AllWindowFunction<>(){ + // implement first-n logic + }); + {{< /highlight >}} +
Join + {{< highlight "java" >}} +DataSet> dataSet1 = // [...] +DataSet> dataSet2 = // [...] +dataSet1.join(dataSet2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .with(new JoinFunction<>(){ + // implement user-defined join logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream1 = // [...] +DataStream> dataStream2 = // [...] +dataStream1.join(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get())) + .apply(new JoinFunction<>(){ + // implement user-defined join logic + }); + {{< /highlight >}} +
CoGroup + {{< highlight "java" >}} +DataSet> dataSet1 = // [...] +DataSet> dataSet2 = // [...] +dataSet1.coGroup(dataSet2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .with(new CoGroupFunction<>(){ + // implement user-defined co group logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream1 = // [...] +DataStream> dataStream2 = // [...] +dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get())) + .apply(new CoGroupFunction<>(){ + // implement user-defined co group logic + }); + {{< /highlight >}} +
OuterJoin + {{< highlight "java" >}} +DataSet> dataSet1 = // [...] +DataSet> dataSet2 = // [...] +// left outer join +dataSet1.leftOuterJoin(dataSet2) + .where(dataSet1.f0) + .equalTo(dataSet2.f0) + .with(new JoinFunction<>(){ + // implement user-defined left outer join logic + }); +// right outer join +dataSet1.rightOuterJoin(dataSet2) + .where(dataSet1.f0) + .equalTo(dataSet2.f0) + .with(new JoinFunction<>(){ + // implement user-defined right outer join logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} + DataStream> dataStream1 = // [...] + DataStream> dataStream2 = // [...] + // left outer join + dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply((leftIterable, rightInterable, collector) -> { + if(!rightInterable.iterator().hasNext()){ + // implement user-defined left outer join logic + } + }); + // right outer join + dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply((leftIterable, rightInterable, collector) -> { + if(!leftIterable.iterator().hasNext()){ + // implement user-defined right outer join logic + } + }); + {{< /highlight >}} +
+ +### Category 3 + +For category 3, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency. + +Currently, DataStream API does not directly support aggregations on non-keyed streams (subtask-scope aggregations). In order to do so, we need to first assign the subtask id +to the records, then turn the stream into a keyed stream. The `AddSubtaskIdMapFunction` in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#addsubtaskidmapfunction" >}}) shows how +to do that, and we will reuse it in the rest of this document. + + + + + + + + + + + + + + + + + + + + + +
OperationsDataSetDataStream
MapPartition/SortPartition + {{< highlight "java" >}} +DataSet dataSet = // [...] +// MapPartition +dataSet.mapPartition(new MapPartitionFunction<>(){ + // implement user-defined map partition logic + }); +// SortPartition +dataSet.sortPartition(0, Order.ASCENDING); +dataSet.sortPartition(0, Order.DESCENDING); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +// assign subtask ID to all records +DataStream> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction()); +dataStream1.keyBy(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined map partition or sort partition logic + }); + {{< /highlight >}} +
Cross + {{< highlight "java" >}} +DataSet dataSet1 = // [...] +DataSet dataSet2 = // [...] +// Cross +dataSet1.cross(dataSet2) + .with(new CrossFunction<>(){ + // implement user-defined cross logic + }) + {{< /highlight >}} + + {{< highlight "java" >}} +// the parallelism of dataStream1 and dataStream2 should be same +DataStream dataStream1 = // [...] +DataStream dataStream2 = // [...] +DataStream> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction()); +DataStream> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction()); +// join the two streams according to the subtask ID +dataStream3.join(dataStream4) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new JoinFunction<>(){ + // implement user-defined cross logic + }) + {{< /highlight >}} +
+ +### Category 4 + +The behaviors of the following DataSet APIs are not supported by DataStream. + +* RangePartition +* GroupCombine + + +## Appendix + +#### EndOfStreamWindows + +The following code shows the example of `EndOfStreamWindows`. + +```java +public class EndOfStreamWindows extends WindowAssigner { + private static final long serialVersionUID = 1L; + + private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows(); + + private static final TimeWindow TIME_WINDOW_INSTANCE = + new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); + + private EndOfStreamWindows() {} + + public static EndOfStreamWindows get() { + return INSTANCE; + } + + @Override + public Collection assignWindows( + Object element, long timestamp, WindowAssignerContext context) { + return Collections.singletonList(TIME_WINDOW_INSTANCE); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return new EndOfStreamTrigger(); + } + + @Override + public String toString() { + return "EndOfStreamWindows()"; + } + + @Override + public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) { + return new TimeWindow.Serializer(); + } + + @Override + public boolean isEventTime() { + return true; + } + + @Internal + public static class EndOfStreamTrigger extends Trigger { + @Override + public TriggerResult onElement( + Object element, long timestamp, TimeWindow window, TriggerContext ctx) + throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { + return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + } +} +``` + +#### AddSubtaskIDMapFunction + +The following code shows the example of `AddSubtaskIDMapFunction`. +```java +public static class AddSubtaskIDMapFunction extends RichMapFunction> { + @Override + public Tuple2 map(T value) { + return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value); + } +} +``` + +{{< top >}} diff --git a/docs/content/docs/dev/datastream/dataset_migration.md b/docs/content/docs/dev/datastream/dataset_migration.md new file mode 100644 index 0000000000000..25d1cb8f9a26b --- /dev/null +++ b/docs/content/docs/dev/datastream/dataset_migration.md @@ -0,0 +1,774 @@ +--- +title: "How to Migrate from DataSet to DataStream" +weight: 302 +type: docs +--- + + +# How to Migrate from DataSet to DataStream + +The DataSet API has been formally deprecated and will no longer receive active maintenance and support. It will be removed in the +Flink 2.0 version. Flink users are recommended to migrate from the DataSet API to the DataStream API, Table API and SQL for their +data processing requirements. + +Noticed that APIs in DataStream do not always match those in DataSet exactly. The purpose of this document is to help users understand +how to achieve the same data processing behaviors with DataStream APIs as using DataSet APIs. + +According to the changes in coding and execution efficiency that are required for migration, we categorized DataSet APIs into 4 categories: + +- Category 1: APIs that have exact equivalent in DataStream, which requires barely any changes to migrate. + +- Category 2: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, which might require some code changes for +migration but will result in the same execution efficiency. + +- Category 3: APIs whose behavior can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency. + +- Category 4: APIs whose behaviors are not supported by DataStream API. + +The subsequent sections will first introduce how to set the execution environment and source/sink, then provide detailed explanations on how to migrate +each category of DataSet APIs to the DataStream APIs, highlighting the specific considerations and challenges associated with each +category. + + +## Setting the execution environment + +The first step of migrating an application from DataSet API to DataStream API is to replace `ExecutionEnvironment` with `StreamExecutionEnvironment`. + + + + + + + + + + + + + + +
DataSetDataStream
+ {{< highlight "java" >}} +// Create the execution environment +ExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +ExecutionEnvironment.createLocalEnvironment(); +// Create the collection environment +new CollectionEnvironment(); +// Create the remote environment +ExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); + {{< /highlight >}} + + {{< highlight "java" >}} +// Create the execution environment +StreamExecutionEnvironment.getExecutionEnvironment(); +// Create the local execution environment +StreamExecutionEnvironment.createLocalEnvironment(); +// The collection environment is not supported. +// Create the remote environment +StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles); + {{< /highlight >}} +
+ +Unlike DataSet, DataStream supports processing on both bounded and unbounded data streams. Thus, user needs to explicitly set the execution mode +to `RuntimeExecutionMode.BATCH` if that is expected. + +```java +StreamExecutionEnvironment executionEnvironment = // [...]; +executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); +``` + +## Using the streaming sources and sinks + +### Sources + +The DataStream API uses `DataStreamSource` to read records from external system, while the DataSet API uses the `DataSource`. + + + + + + + + + + + + + + +
DataSetDataStream
+ {{< highlight "java" >}} +// Read data from file +DataSource<> source = ExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataSource<> source = ExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataSource<> source = ExecutionEnvironment.createInput(inputFormat) + {{< /highlight >}} + + {{< highlight "java" >}} +// Read data from file +DataStreamSource<> source = StreamExecutionEnvironment.readFile(inputFormat, filePath); +// Read data from collection +DataStreamSource<> source = StreamExecutionEnvironment.fromCollection(data); +// Read data from inputformat +DataStreamSource<> source = StreamExecutionEnvironment.createInput(inputFormat) + {{< /highlight >}} +
+ +### Sinks + +The DataStream API uses `DataStreamSink` to write records to external system, while the +DataSet API uses the `DataSink`. + + + + + + + + + + + + + + +
DataSetDataStream
+ {{< highlight "java" >}} +// Write to outputformat +DataSink<> sink = dataSet.output(outputFormat); +// Write to csv file +DataSink<> sink = dataSet.writeAsCsv(filePath); +// Write to text file +DataSink<> sink = dataSet.writeAsText(filePath); + {{< /highlight >}} + + {{< highlight "java" >}} +// Write to sink +DataStreamSink<> sink = dataStream.sinkTo(sink) +// Write to csv file +DataStreamSink<> sink = dataStream.writeAsCsv(path); +// Write to text file +DataStreamSink<> sink = dataStream.writeAsText(path); + {{< /highlight >}} +
+ +If you are looking for pre-defined source and sink connectors of DataStream, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}) + +## Migrating DataSet APIs + +### Category 1 + +For Category 1, these DataSet APIs have exact equivalent in DataStream, which requires barely any changes to migrate. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OperationsDataSetDataStream
Map + {{< highlight "java" >}} +dataSet.map(new MapFunction<>(){ +// implement user-defined map logic +}); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.map(new MapFunction<>(){ +// implement user-defined map logic +}); + {{< /highlight >}} +
FlatMap + {{< highlight "java" >}} +dataSet.flatMap(new FlatMapFunction<>(){ +// implement user-defined flatmap logic +}); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.flatMap(new FlatMapFunction<>(){ +// implement user-defined flatmap logic +}); + {{< /highlight >}} +
Filter + {{< highlight "java" >}} +dataSet.filter(new FilterFunction<>(){ +// implement user-defined filter logic +}); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.filter(new FilterFunction<>(){ +// implement user-defined filter logic +}); + {{< /highlight >}} +
Union + {{< highlight "java" >}} +dataSet1.union(dataSet2); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream1.union(dataStream2); + {{< /highlight >}} +
Rebalance + {{< highlight "java" >}} +dataSet.rebalance(); + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.rebalance(); + {{< /highlight >}} +
Project + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.project(2,0); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +dataStream.project(2,0); + {{< /highlight >}} +
Reduce on Grouped DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} +
Aggregate on Grouped DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +// compute sum of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(SUM, 1); +// compute min of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(MIN, 1); +// compute max of the second field +dataSet.groupBy(value -> value.f0) + .aggregate(MAX, 1); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +// compute sum of the second field +dataStream.keyBy(value -> value.f0) + .sum(1); +// compute min of the second field +dataStream.keyBy(value -> value.f0) + .min(1); +// compute max of the second field +dataStream.keyBy(value -> value.f0) + .max(1); + {{< /highlight >}} +
+ +### Category 2 + +For category 2, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, which might require some code changes for +migration but will result in the same execution efficiency. + +Operations on a full DataSet correspond to the global window aggregation in DataStream with a custom window that is triggered at the end of the inputs. The `EndOfStreamWindows` +in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#endofstreamwindows" >}}) shows how such a window can be implemented. We will reuse it in the rest of this document. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
OperationsDataSetDataStream
Distinct + {{< highlight "java" >}} +DataSet dataSet = // [...] +dataSet.distinct(); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +dataStream.keyBy(value -> value) + .reduce((value1, value2) -> value1); + {{< /highlight >}} +
Hash-Partition + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.partitionByHash(value -> value.f0); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +// partition by the hashcode of key +dataStream.partitionCustom( + (key, numSubpartition) -> key.hashCode() % numSubpartition, + value -> value.f0); + {{< /highlight >}} +
Reduce on Full DataSet + {{< highlight "java" >}} +DataSet dataSet = // [...] +dataSet.reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +dataStream.windowAll(EndOfStreamWindows.get()) + .reduce(new ReduceFunction<>(){ + // implement user-defined reduce logic + }); + {{< /highlight >}} +
Aggregate on Full DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +// compute sum of the second field +dataSet.aggregate(SUM, 1); +// compute min of the second field +dataSet.aggregate(MIN, 1); +// compute max of the second field +dataSet.aggregate(MAX, 1); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +// compute sum of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .sum(1); +// compute min of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .min(1); +// compute max of the second field +dataStream.windowAll(EndOfStreamWindows.get()) + .max(1); + {{< /highlight >}} +
GroupReduce on Full DataSet + {{< highlight "java" >}} +DataSet dataSet = // [...] +dataSet.reduceGroup(new GroupReduceFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +dataStream.windowAll(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} +
GroupReduce on Grouped DataSet + {{< highlight "java" >}} +DataSet> dataSet = // [...] +dataSet.groupBy(value -> value.f0) + .reduceGroup(new GroupReduceFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream = // [...] +dataStream.keyBy(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined group reduce logic + }); + {{< /highlight >}} +
First-n + {{< highlight "java" >}} +dataSet.first(n) + {{< /highlight >}} + + {{< highlight "java" >}} +dataStream.windowAll(EndOfStreamWindows.get()) + .apply(new AllWindowFunction<>(){ + // implement first-n logic + }); + {{< /highlight >}} +
Join + {{< highlight "java" >}} +DataSet> dataSet1 = // [...] +DataSet> dataSet2 = // [...] +dataSet1.join(dataSet2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .with(new JoinFunction<>(){ + // implement user-defined join logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream1 = // [...] +DataStream> dataStream2 = // [...] +dataStream1.join(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get())) + .apply(new JoinFunction<>(){ + // implement user-defined join logic + }); + {{< /highlight >}} +
CoGroup + {{< highlight "java" >}} +DataSet> dataSet1 = // [...] +DataSet> dataSet2 = // [...] +dataSet1.coGroup(dataSet2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .with(new CoGroupFunction<>(){ + // implement user-defined co group logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream> dataStream1 = // [...] +DataStream> dataStream2 = // [...] +dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get())) + .apply(new CoGroupFunction<>(){ + // implement user-defined co group logic + }); + {{< /highlight >}} +
OuterJoin + {{< highlight "java" >}} +DataSet> dataSet1 = // [...] +DataSet> dataSet2 = // [...] +// left outer join +dataSet1.leftOuterJoin(dataSet2) + .where(dataSet1.f0) + .equalTo(dataSet2.f0) + .with(new JoinFunction<>(){ + // implement user-defined left outer join logic + }); +// right outer join +dataSet1.rightOuterJoin(dataSet2) + .where(dataSet1.f0) + .equalTo(dataSet2.f0) + .with(new JoinFunction<>(){ + // implement user-defined right outer join logic + }); + {{< /highlight >}} + + {{< highlight "java" >}} + DataStream> dataStream1 = // [...] + DataStream> dataStream2 = // [...] + // left outer join + dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply((leftIterable, rightInterable, collector) -> { + if(!rightInterable.iterator().hasNext()){ + // implement user-defined left outer join logic + } + }); + // right outer join + dataStream1.coGroup(dataStream2) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply((leftIterable, rightInterable, collector) -> { + if(!leftIterable.iterator().hasNext()){ + // implement user-defined right outer join logic + } + }); + {{< /highlight >}} +
+ +### Category 3 + +For category 3, the behavior of these DataSet APIs can be achieved by other APIs with different semantics in DataStream, with potentially additional cost in execution efficiency. + +Currently, DataStream API does not directly support aggregations on non-keyed streams (subtask-scope aggregations). In order to do so, we need to first assign the subtask id +to the records, then turn the stream into a keyed stream. The `AddSubtaskIdMapFunction` in the [Appendix]({{< ref "docs/dev/datastream/dataset_migration#addsubtaskidmapfunction" >}}) shows how +to do that, and we will reuse it in the rest of this document. + + + + + + + + + + + + + + + + + + + + + +
OperationsDataSetDataStream
MapPartition/SortPartition + {{< highlight "java" >}} +DataSet dataSet = // [...] +// MapPartition +dataSet.mapPartition(new MapPartitionFunction<>(){ + // implement user-defined map partition logic + }); +// SortPartition +dataSet.sortPartition(0, Order.ASCENDING); +dataSet.sortPartition(0, Order.DESCENDING); + {{< /highlight >}} + + {{< highlight "java" >}} +DataStream dataStream = // [...] +// assign subtask ID to all records +DataStream> dataStream1 = dataStream.map(new AddSubtaskIDMapFunction()); +dataStream1.keyBy(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new WindowFunction<>(){ + // implement user-defined map partition or sort partition logic + }); + {{< /highlight >}} +
Cross + {{< highlight "java" >}} +DataSet dataSet1 = // [...] +DataSet dataSet2 = // [...] +// Cross +dataSet1.cross(dataSet2) + .with(new CrossFunction<>(){ + // implement user-defined cross logic + }) + {{< /highlight >}} + + {{< highlight "java" >}} +// the parallelism of dataStream1 and dataStream2 should be same +DataStream dataStream1 = // [...] +DataStream dataStream2 = // [...] +DataStream> datastream3 = dataStream1.broadcast().map(new AddSubtaskIDMapFunction()); +DataStream> datastream4 = dataStream2.map(new AddSubtaskIDMapFunction()); +// join the two streams according to the subtask ID +dataStream3.join(dataStream4) + .where(value -> value.f0) + .equalTo(value -> value.f0) + .window(EndOfStreamWindows.get()) + .apply(new JoinFunction<>(){ + // implement user-defined cross logic + }) + {{< /highlight >}} +
+ +### Category 4 + +The behaviors of the following DataSet APIs are not supported by DataStream. + +* RangePartition +* GroupCombine + + +## Appendix + +#### EndOfStreamWindows + +The following code shows the example of `EndOfStreamWindows`. + +```java +public class EndOfStreamWindows extends WindowAssigner { + private static final long serialVersionUID = 1L; + + private static final EndOfStreamWindows INSTANCE = new EndOfStreamWindows(); + + private static final TimeWindow TIME_WINDOW_INSTANCE = + new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); + + private EndOfStreamWindows() {} + + public static EndOfStreamWindows get() { + return INSTANCE; + } + + @Override + public Collection assignWindows( + Object element, long timestamp, WindowAssignerContext context) { + return Collections.singletonList(TIME_WINDOW_INSTANCE); + } + + @Override + public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { + return new EndOfStreamTrigger(); + } + + @Override + public String toString() { + return "EndOfStreamWindows()"; + } + + @Override + public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) { + return new TimeWindow.Serializer(); + } + + @Override + public boolean isEventTime() { + return true; + } + + @Internal + public static class EndOfStreamTrigger extends Trigger { + @Override + public TriggerResult onElement( + Object element, long timestamp, TimeWindow window, TriggerContext ctx) + throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { + return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception {} + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; + } + } +} +``` + +#### AddSubtaskIDMapFunction + +The following code shows the example of `AddSubtaskIDMapFunction`. +```java +public static class AddSubtaskIDMapFunction extends RichMapFunction> { + @Override + public Tuple2 map(T value) { + return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value); + } +} +``` + +{{< top >}}