From d1fc38c349ac1a0e2eebe4300faa051ac9c297cb Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 26 Feb 2016 20:56:40 +0100 Subject: [PATCH] [FLINK-3527] Add Scala DataStream.transform() This implicitly adds KeyedStream.transform() and also explicitly ConnectedStreams.transform(). This also removes the transform exclusions from the API completeness tests. --- .../streaming/api/scala/ConnectedStreams.scala | 13 +++++++++++-- .../flink/streaming/api/scala/DataStream.scala | 15 +++++++++++++++ .../scala/StreamingScalaAPICompletenessTest.scala | 3 --- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala index a80937cd7b2ff..669f12e58c117 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala @@ -18,12 +18,14 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{Internal, Public} +import org.apache.flink.annotation.{PublicEvolving, Internal, Public} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream} +import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, SingleOutputStreamOperator, KeyedStream} import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction} +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator +import org.apache.flink.streaming.api.transformations.TwoInputTransformation import org.apache.flink.util.Collector /** @@ -269,6 +271,13 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { private[flink] def clean[F <: AnyRef](f: F): F = { new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) } + + @PublicEvolving + def transform[R: TypeInformation]( + functionName: String, + operator: TwoInputStreamOperator[IN1, IN2, R]): DataStream[R] = { + asScalaStream(javaStream.transform(functionName, implicitly[TypeInformation[R]], operator)) + } } @Internal diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 3522b51d3e1fb..197cebd9dc184 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _} import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks, AssignerWithPeriodicWatermarks, AscendingTimestampExtractor, TimestampExtractor} +import org.apache.flink.streaming.api.operators.OneInputStreamOperator +import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} @@ -897,4 +899,17 @@ class DataStream[T](stream: JavaStream[T]) { new StreamExecutionEnvironment(stream.getExecutionEnvironment).scalaClean(f) } + /** + * Transforms the [[DataStream]] by using a custom [[OneInputStreamOperator]]. + * + * @param operatorName name of the operator, for logging purposes + * @param operator the object containing the transformation logic + * @tparam R the type of elements emitted by the operator + */ + @PublicEvolving + def transform[R: TypeInformation]( + operatorName: String, + operator: OneInputStreamOperator[T, R]): DataStream[R] = { + asScalaStream(stream.transform(operatorName, implicitly[TypeInformation[R]], operator)) + } } diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index 7ba3194dc4352..415f057c85b39 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -39,7 +39,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { // private[flink]. "org.apache.flink.streaming.api.datastream.DataStream.getType", "org.apache.flink.streaming.api.datastream.DataStream.copy", - "org.apache.flink.streaming.api.datastream.DataStream.transform", "org.apache.flink.streaming.api.datastream.DataStream.getTransformation", "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.copy", "org.apache.flink.streaming.api.datastream.ConnectedStreams.getExecutionEnvironment", @@ -49,7 +48,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType1", "org.apache.flink.streaming.api.datastream.ConnectedStreams.getType2", "org.apache.flink.streaming.api.datastream.ConnectedStreams.addGeneralWindowCombine", - "org.apache.flink.streaming.api.datastream.ConnectedStreams.transform", "org.apache.flink.streaming.api.datastream.WindowedDataStream.getType", "org.apache.flink.streaming.api.datastream.WindowedDataStream.getExecutionConfig", @@ -59,7 +57,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { "org.apache.flink.streaming.api.datastream.AllWindowedStream.getExecutionEnvironment", "org.apache.flink.streaming.api.datastream.AllWindowedStream.getInputType", - "org.apache.flink.streaming.api.datastream.KeyedStream.transform", "org.apache.flink.streaming.api.datastream.KeyedStream.getKeySelector", "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.isChainingEnabled",