Skip to content

Commit

Permalink
[FLINK-2613] [scala shell] Print usage information for Scala Shell
Browse files Browse the repository at this point in the history
  - Change startup code of scala shell
  - User has to specify local or remote mode explicitly now.

This closes #1106.
  • Loading branch information
nikste authored and chiwanpark committed Oct 9, 2015
1 parent 48f614c commit 7750a1f
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 69 deletions.
9 changes: 5 additions & 4 deletions docs/apis/scala_shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ Flink and setting up a cluster please refer to
To use the shell with an integrated Flink cluster just execute:

~~~bash
bin/start-scala-shell.sh
bin/start-scala-shell.sh local
~~~

in the root directory of your binary Flink directory.

To use it with a running cluster you can supply the host and port of the JobManager with:
To use it with a running cluster start the scala shell with the keyword `remote`
and supply the host and port of the JobManager with:

~~~bash
bin/start-scala-shell.sh --host <hostname> --port <portnumber>
bin/start-scala-shell.sh remote <hostname> <portnumber>
~~~

## Usage
Expand Down Expand Up @@ -75,6 +76,6 @@ It is possible to add external classpaths to the Scala-shell. These will be sent
Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.

~~~bash
bin/start-scala-shell --addclasspath <path/to/jar.jar>
bin/start-scala-shell.sh [local | remote <host> <port>] --addclasspath <path/to/jar.jar>
~~~

Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ class FlinkILoop(
out0: JPrintWriter)
extends ILoopCompat(in0, out0) {

def this(host:String,
port:Int,
externalJars : Option[Array[String]],
def this(host: String,
port: Int,
externalJars: Option[Array[String]],
in0: BufferedReader,
out: JPrintWriter){
this(host:String, port:Int, externalJars, Some(in0), out)
this(host: String, port: Int, externalJars, Some(in0), out)
}

def this(host:String, port:Int, externalJars : Option[Array[String]]){
this(host:String,port: Int, externalJars , None, new JPrintWriter(Console.out, true))
def this(host: String, port: Int, externalJars: Option[Array[String]]){
this(host: String, port: Int, externalJars, None, new JPrintWriter(Console.out, true))
}

def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,88 +18,136 @@

package org.apache.flink.api.scala

import java.io.{StringWriter, BufferedReader}

import org.apache.flink.api.common.ExecutionMode

import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster

import scala.tools.nsc.Settings

import scala.tools.nsc.interpreter._


object FlinkShell {

object ExecutionMode extends Enumeration {
val UNDEFINED, LOCAL, REMOTE = Value
}

var bufferedReader: Option[BufferedReader] = None

def main(args: Array[String]) {

// scopt, command line arguments
case class Config(
port: Int = -1,
host: String = "none",
externalJars: Option[Array[String]] = None)
externalJars: Option[Array[String]] = None,
flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED)

val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") {
head ("Flink Scala Shell")

opt[Int] ('p', "port") action {
(x, c) =>
c.copy (port = x)
} text("port specifies port of running JobManager")

opt[(String)] ('h',"host") action {
case (x, c) =>
c.copy (host = x)
} text("host specifies host name of running JobManager")

opt[(String)] ('a',"addclasspath") action {
case (x,c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
} text("specifies additional jars to be used in Flink")

help("help") text("prints this usage text")
cmd("local") action {
(_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL)
} text("starts Flink scala shell with a local Flink cluster\n") children(
opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
} text("specifies additional jars to be used in Flink\n")
)

cmd("remote") action { (_, c) =>
c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE)
} text("starts Flink scala shell connecting to a remote cluster\n") children(
arg[String]("<host>") action { (h, c) =>
c.copy(host = h) }
text("remote host name as string"),
arg[Int]("<port>") action { (p, c) =>
c.copy(port = p) }
text("remote port as integer\n"),
opt[(String)]("addclasspath") abbr("a") valueName("<path/to/jar>") action {
case (x, c) =>
val xArray = x.split(":")
c.copy(externalJars = Option(xArray))
} text("specifies additional jars to be used in Flink")
)
help("help") abbr("h") text("prints this usage text\n")
}


// parse arguments
parser.parse (args, Config () ) match {
parser.parse (args, Config()) match {
case Some(config) =>
startShell(config.host,config.port,config.externalJars)
startShell(config.host,
config.port,
config.flinkShellExecutionMode,
config.externalJars)

case _ => println("Could not parse program arguments")
}
}


def startShell(
userHost : String,
userPort : Int,
externalJars : Option[Array[String]] = None): Unit ={
userHost: String,
userPort: Int,
executionMode: ExecutionMode.Value,
externalJars: Option[Array[String]] = None): Unit ={

println("Starting Flink Shell:")

var cluster: LocalFlinkMiniCluster = null

// either port or userhost not specified by user, create new minicluster
val (host,port) = if (userHost == "none" || userPort == -1 ) {
println("Creating new local server")
cluster = new LocalFlinkMiniCluster(new Configuration, false)
cluster.start()
("localhost",cluster.getLeaderRPCPort)
} else {
println(s"Connecting to remote server (host: $userHost, port: $userPort).")
(userHost, userPort)
}

// custom shell
val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();

repl.settings = new Settings()

repl.settings.usejavacp.value = true

// start scala interpreter shell
repl.process(repl.settings)

//repl.closeInterpreter()

if (cluster != null) {
cluster.stop()
val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) =
executionMode match {
case ExecutionMode.LOCAL =>
val miniCluster = new LocalFlinkMiniCluster(new Configuration, false)
miniCluster.start()
val port = miniCluster.getLeaderRPCPort
println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
("localhost", port, Some(miniCluster))

case ExecutionMode.REMOTE =>
if (userHost == "none" || userPort == -1) {
println("Error: <host> or <port> not specified!")
return
} else {
println(s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n")
(userHost, userPort, None)
}

case ExecutionMode.UNDEFINED =>
println("Error: please specify execution mode:")
println("[local | remote <host> <port>]")
return
}

try {
// custom shell
val repl: FlinkILoop =
bufferedReader match {

case Some(br) =>
val out = new StringWriter()
new FlinkILoop(host, port, externalJars, bufferedReader, new JPrintWriter(out))

case None =>
new FlinkILoop(host, port, externalJars)
}

val settings = new Settings()

settings.usejavacp.value = true

// start scala interpreter shell
repl.process(settings)
} finally {
cluster match {
case Some(c) => c.stop()
case None =>
}
}

println(" good bye ..")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import scala.tools.nsc.Settings
@RunWith(classOf[JUnitRunner])
class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {

var cluster: Option[ForkableFlinkMiniCluster] = None
val parallelism = 4

test("Prevent re-creation of environment") {

val input: String =
Expand All @@ -51,7 +54,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val input: String =
"""
val initial = env.fromElements(0)
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
val result = iterationInput.map { i =>
val x = Math.random()
Expand All @@ -78,7 +80,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
val result = counts.print()
""".stripMargin
Expand Down Expand Up @@ -117,14 +118,11 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
val input =
"""
case class WC(word: String, count: Int)
val wordCounts = env.fromElements(
new WC("hello", 1),
new WC("world", 2),
new WC("world", 8))
val reduced = wordCounts.groupBy(0).sum(1)
reduced.print()
""".stripMargin

Expand Down Expand Up @@ -225,8 +223,50 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
out.toString + stdout
}

var cluster: Option[ForkableFlinkMiniCluster] = None
val parallelism = 4
/**
* tests flink shell startup with remote cluster (starts cluster internally)
*/
test("start flink scala shell with remote cluster") {

val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
"els.print\nError\n:q\n"

val in: BufferedReader = new BufferedReader(
new StringReader(
input + "\n"))
val out: StringWriter = new StringWriter

val baos: ByteArrayOutputStream = new ByteArrayOutputStream
val oldOut: PrintStream = System.out
System.setOut(new PrintStream(baos))

val (c, args) = cluster match{
case Some(cl) =>
val arg = Array("remote",
cl.hostname,
Integer.toString(cl.getLeaderRPCPort))
(cl, arg)
case None =>
fail("Cluster creation failed!")
}

//start scala shell with initialized
// buffered reader for testing
FlinkShell.bufferedReader = Some(in)
FlinkShell.main(args)
baos.flush()

val output: String = baos.toString
System.setOut(oldOut)

output should include("Job execution switched to status FINISHED.")
output should include("a\nb")

output should not include "Error"
output should not include "ERROR"
output should not include "Exception"
output should not include "failed"
}

override def beforeAll(): Unit = {
val cl = TestBaseUtils.startCluster(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala

import java.io._

import org.junit.runner.RunWith
import org.scalatest.{Matchers, FunSuite}
import org.scalatest.junit.JUnitRunner


@RunWith(classOf[JUnitRunner])
class ScalaShellLocalStartupITCase extends FunSuite with Matchers {

/**
* tests flink shell with local setup through startup script in bin folder
*/
test("start flink scala shell with local cluster") {

val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n"
val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
val out: StringWriter = new StringWriter
val baos: ByteArrayOutputStream = new ByteArrayOutputStream
val oldOut: PrintStream = System.out
System.setOut(new PrintStream(baos))
val args: Array[String] = Array("local")

//start flink scala shell
FlinkShell.bufferedReader = Some(in);
FlinkShell.main(args)

baos.flush()
val output: String = baos.toString
System.setOut(oldOut)

output should include("Job execution switched to status FINISHED.")
output should include("a\nb")

output should not include "Error"
output should not include "ERROR"
output should not include "Exception"
output should not include "failed"
}
}

0 comments on commit 7750a1f

Please sign in to comment.