Skip to content

Commit

Permalink
[streaming] [scala] Scala wrapper for DataStreamUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed Feb 3, 2016
1 parent 3750004 commit b436635
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 0 deletions.
137 changes: 137 additions & 0 deletions flink-contrib/flink-streaming-contrib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ under the License.
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
Expand Down Expand Up @@ -79,4 +84,136 @@ under the License.
</dependency>
</dependencies>

<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.4</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>

<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
<jvmArg>-Xms128m</jvmArg>
<jvmArg>-Xmx512m</jvmArg>
</jvmArgs>
<compilerPlugins combine.children="append">
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
</plugin>

<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>

<!-- Adding scala source directories to build path -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.5.0</version>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.contrib.streaming.{DataStreamUtils => JavaStreamUtils}
import org.apache.flink.streaming.api.scala._

import _root_.scala.reflect.ClassTag
import scala.collection.JavaConverters._

package object utils {

/**
* This class provides simple utility methods for collecting a [[DataStream]],
* effectively enriching it with the functionality encapsulated by [[JavaStreamUtils]].
*
* @param self DataStream
*/
implicit class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {

/**
* Returns a scala iterator to iterate over the elements of the DataStream.
* @return The iterator
*/
def collect() : Iterator[T] = {
JavaStreamUtils.collect(self.javaStream).asScala
}

}

}

0 comments on commit b436635

Please sign in to comment.