From 375000432f9d5407e61b0914f9bf213614d797ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Balassi?= Date: Tue, 2 Feb 2016 14:10:47 +0100 Subject: [PATCH 1/2] [streaming] [scala] Revert removing getJavaStream() from DataStream --- .../org/apache/flink/streaming/api/scala/DataStream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 69a8dc59e45e0..0c3d936ecc7a8 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -44,7 +44,7 @@ class DataStream[T](stream: JavaStream[T]) { /** * Gets the underlying java DataStream object. */ - private[flink] def javaStream: JavaStream[T] = stream + def javaStream: JavaStream[T] = stream /** * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]]. @@ -130,7 +130,7 @@ class DataStream[T](stream: JavaStream[T]) { case _ => throw new UnsupportedOperationException("Only supported for operators.") this } - + /** * Turns off chaining for this operator so thread co-location will not be * used as an optimization.

Chaining can be turned off for the whole From b436635db652526cd49a07b106f15371496d650d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Balassi?= Date: Tue, 2 Feb 2016 14:18:05 +0100 Subject: [PATCH 2/2] [streaming] [scala] Scala wrapper for DataStreamUtils --- flink-contrib/flink-streaming-contrib/pom.xml | 137 ++++++++++++++++++ .../streaming/scala/utils/package.scala | 48 ++++++ 2 files changed, 185 insertions(+) create mode 100644 flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml index e60833413d3ab..88ecf2003151b 100644 --- a/flink-contrib/flink-streaming-contrib/pom.xml +++ b/flink-contrib/flink-streaming-contrib/pom.xml @@ -42,6 +42,11 @@ under the License. flink-streaming-java_2.10 ${project.version} + + org.apache.flink + flink-streaming-scala_2.10 + ${project.version} + org.apache.flink flink-clients_2.10 @@ -79,4 +84,136 @@ under the License. + + + + + net.alchim31.maven + scala-maven-plugin + 3.1.4 + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} + + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.8 + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + org.scala-lang:scala-library + org.scala-lang:scala-compiler + + + **/*.scala + **/*.java + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + org.scalastyle + scalastyle-maven-plugin + 0.5.0 + + + + check + + + + + false + true + true + false + ${basedir}/src/main/scala + ${basedir}/src/test/scala + ${project.basedir}/../../tools/maven/scalastyle-config.xml + ${project.basedir}/scalastyle-output.xml + UTF-8 + + + + diff --git a/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala b/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala new file mode 100644 index 0000000000000..86a2bdcc23ea4 --- /dev/null +++ b/flink-contrib/flink-streaming-contrib/src/main/scala/org/apache/flink/contrib/streaming/scala/utils/package.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.scala + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.contrib.streaming.{DataStreamUtils => JavaStreamUtils} +import org.apache.flink.streaming.api.scala._ + +import _root_.scala.reflect.ClassTag +import scala.collection.JavaConverters._ + +package object utils { + + /** + * This class provides simple utility methods for collecting a [[DataStream]], + * effectively enriching it with the functionality encapsulated by [[JavaStreamUtils]]. + * + * @param self DataStream + */ + implicit class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) { + + /** + * Returns a scala iterator to iterate over the elements of the DataStream. + * @return The iterator + */ + def collect() : Iterator[T] = { + JavaStreamUtils.collect(self.javaStream).asScala + } + + } + +}