From a0ad8fbc90262d6ceb8699b02e5a9661f836d405 Mon Sep 17 00:00:00 2001 From: Chiwan Park Date: Sat, 26 Sep 2015 02:47:06 +0900 Subject: [PATCH 1/3] [FLINK-2767] [scala shell] Add Scala 2.11 support to Scala shell --- flink-dist/pom.xml | 29 ++++----------- flink-staging/flink-scala-shell/pom.xml | 1 + .../apache/flink/api/scala/ILoopCompat.scala | 29 +++++++++++++++ .../apache/flink/api/scala/ILoopCompat.scala | 31 ++++++++++++++++ .../api/scala/FlinkILoop.scala | 36 +++++++++++-------- .../flink/api/scala/ScalaShellITSuite.scala | 2 +- flink-staging/pom.xml | 18 +--------- 7 files changed, 90 insertions(+), 56 deletions(-) create mode 100644 flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala create mode 100644 flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index bf7b9dfd39c0b..123a91a5541bb 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -125,33 +125,16 @@ under the License. ${project.version} + + org.apache.flink + flink-scala-shell + ${project.version} + + - - scala-2.10 - - - - - !scala-2.11 - - - - - 2.10.4 - 2.10 - - - - - org.apache.flink - flink-scala-shell - ${project.version} - - - include-yarn diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml index 94718a3a66499..7a8b146911226 100644 --- a/flink-staging/flink-scala-shell/pom.xml +++ b/flink-staging/flink-scala-shell/pom.xml @@ -180,6 +180,7 @@ under the License. src/main/scala + src/main/scala-${scala.binary.version} diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala new file mode 100644 index 0000000000000..797b420cd8b70 --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/scala-2.10/org/apache/flink/api/scala/ILoopCompat.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.BufferedReader + +import _root_.scala.tools.nsc.interpreter._ + +class ILoopCompat( + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoop(in0, out0) { +} diff --git a/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala new file mode 100644 index 0000000000000..c1be6db99c099 --- /dev/null +++ b/flink-staging/flink-scala-shell/src/main/scala-2.11/org/apache/flink/api/scala/ILoopCompat.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala + +import java.io.BufferedReader + +import _root_.scala.tools.nsc.interpreter._ + +class ILoopCompat( + in0: Option[BufferedReader], + out0: JPrintWriter) + extends ILoop(in0, out0) { + + protected def addThunk(f: => Unit): Unit = f +} diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala index 1e96ba3aff2ef..67cd1a496cf4b 100644 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala +++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala @@ -32,10 +32,8 @@ class FlinkILoop( val externalJars: Option[Array[String]], in0: Option[BufferedReader], out0: JPrintWriter) - extends ILoop(in0, out0) { - - - + extends ILoopCompat(in0, out0) { + def this(host:String, port:Int, externalJars : Option[Array[String]], @@ -51,6 +49,7 @@ class FlinkILoop( def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){ this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter) } + // remote environment private val remoteEnv: ScalaShellRemoteEnvironment = { val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) @@ -64,17 +63,6 @@ class FlinkILoop( scalaEnv } - addThunk { - intp.beQuietDuring { - // automatically imports the flink scala api - intp.addImports("org.apache.flink.api.scala._") - intp.addImports("org.apache.flink.api.common.functions._") - // with this we can access this object in the scala shell - intp.bindValue("env", this.scalaEnv) - } - } - - /** * creates a temporary directory to store compiled console files */ @@ -100,6 +88,24 @@ class FlinkILoop( new File(tmpDirBase, "scala_shell_commands.jar") } + private val packageImports = Seq[String]( + "org.apache.flink.api.scala._", + "org.apache.flink.api.common.functions._" + ) + + override def createInterpreter(): Unit = { + super.createInterpreter() + + addThunk { + intp.beQuietDuring { + // import dependencies + intp.interpret("import " + packageImports.mkString(", ")) + + // set execution environment + intp.bind("env", this.scalaEnv) + } + } + } /** * Packages the compiled classes of the current shell session into a Jar file for execution diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala index 7648c50f8d087..0621351b2c4e6 100644 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala +++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala @@ -22,7 +22,7 @@ import java.io._ import java.util.concurrent.TimeUnit import org.apache.flink.runtime.StreamingMode -import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils, TestEnvironment} +import org.apache.flink.test.util.{TestEnvironment, ForkableFlinkMiniCluster, TestBaseUtils} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml index a0cda670054ed..5a73141aabe5f 100644 --- a/flink-staging/pom.xml +++ b/flink-staging/pom.xml @@ -47,6 +47,7 @@ under the License. flink-ml flink-language-binding flink-gelly-scala + flink-scala-shell @@ -72,22 +73,5 @@ under the License. flink-tez - - scala-2.10 - - - - - !scala-2.11 - - - - 2.10.4 - 2.10 - - - flink-scala-shell - - From b60294b16e32d10b9e860140c1462f7db50cca9c Mon Sep 17 00:00:00 2001 From: Chiwan Park Date: Wed, 30 Sep 2015 11:22:16 +0200 Subject: [PATCH 2/3] update package directory name --- .../apache/flink}/api/scala/FlinkILoop.scala | 4 ++-- .../apache/flink}/api/scala/FlinkShell.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) rename flink-staging/flink-scala-shell/src/main/scala/{org.apache.flink => org/apache/flink}/api/scala/FlinkILoop.scala (100%) rename flink-staging/flink-scala-shell/src/main/scala/{org.apache.flink => org/apache/flink}/api/scala/FlinkShell.scala (99%) diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala similarity index 100% rename from flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala rename to flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala index 67cd1a496cf4b..88fcb283419d8 100644 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala +++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -20,11 +20,11 @@ package org.apache.flink.api.scala import java.io.{BufferedReader, File, FileOutputStream} -import scala.tools.nsc.interpreter._ - import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment} import org.apache.flink.util.AbstractID +import scala.tools.nsc.interpreter._ + class FlinkILoop( val host: String, diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala similarity index 99% rename from flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala rename to flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index a4fae91e515d3..224983b547f46 100644 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala +++ b/flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -18,12 +18,11 @@ package org.apache.flink.api.scala - -import scala.tools.nsc.Settings - import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster +import scala.tools.nsc.Settings + object FlinkShell { From bf49cebcac8cf9156454c1d126f2e8dbb44ab02b Mon Sep 17 00:00:00 2001 From: Chiwan Park Date: Mon, 5 Oct 2015 19:28:41 +0200 Subject: [PATCH 3/3] update Scala 2.11 version and jline dependency --- flink-staging/flink-scala-shell/pom.xml | 12 ++++++------ pom.xml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-staging/flink-scala-shell/pom.xml b/flink-staging/flink-scala-shell/pom.xml index 7a8b146911226..5adb8c64b04ab 100644 --- a/flink-staging/flink-scala-shell/pom.xml +++ b/flink-staging/flink-scala-shell/pom.xml @@ -76,12 +76,6 @@ under the License. ${scala.version} - - org.scala-lang - jline - 2.10.4 - - org.apache.flink @@ -275,6 +269,12 @@ under the License. quasiquotes_${scala.binary.version} ${scala.macros.version} + + + org.scala-lang + jline + 2.10.4 + diff --git a/pom.xml b/pom.xml index 6f1a137a4ad1a..fbe27d23807be 100644 --- a/pom.xml +++ b/pom.xml @@ -363,7 +363,7 @@ under the License. - 2.11.4 + 2.11.7 2.11