Skip to content

Commit

Permalink
added integration tests for local and remote startup
Browse files Browse the repository at this point in the history
  • Loading branch information
nikste committed Sep 17, 2015
1 parent 5ebfabe commit 135ca75
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.flink.api.scala


import java.io.{StringWriter, BufferedReader}

import scala.tools.nsc.Settings

import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster

import scala.tools.nsc.interpreter._


object FlinkShell {

val LOCAL = 0;
val REMOTE = 1;
val UNDEFINED = -1;

var bufferedReader: BufferedReader = null;

def main(args: Array[String]) {

Expand Down Expand Up @@ -118,8 +123,13 @@ object FlinkShell {


// custom shell
val repl = new FlinkILoop(host, port, externalJars) //new MyILoop();

var repl : FlinkILoop = null;
if(bufferedReader == null) {
repl = new FlinkILoop(host, port, externalJars) //new MyILoop();
}else{
val out = new StringWriter()
repl = new FlinkILoop(host, port, externalJars,bufferedReader, new JPrintWriter(out));
}
repl.settings = new Settings()

repl.settings.usejavacp.value = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,49 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll {
out.toString + stdout
}

/**
* tests flink shell startup with remote cluster (starts cluster internally)
*/
test("start flink scala shell with remote cluster") {

val input: String = "val els = env.fromElements(\"a\",\"b\");\n" +
"els.print\nError\n:q\n"

val in: BufferedReader = new BufferedReader(
new StringReader(
input + "\n"))
val out: StringWriter = new StringWriter

val baos: ByteArrayOutputStream = new ByteArrayOutputStream
val oldOut: PrintStream = System.out
System.setOut(new PrintStream(baos))
val c = cluster.getOrElse(null);
var args : Array[String] = null;
if(c != null){
args = Array("remote",
c.hostname,
Integer.toString(c.getLeaderRPCPort))
}
else{
assert(false)
}

//start scala shell with initialized
// buffered reader for testing
FlinkShell.bufferedReader = in;
FlinkShell.main(args)
baos.flush

val output: String = baos.toString
System.setOut(oldOut)

assert((!output.contains("Error")))
assert((!output.contains("ERROR")))
assert((!output.contains("Exception")))
assert((!output.contains("failed")))
}


var cluster: Option[ForkableFlinkMiniCluster] = None
val parallelism = 4

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala

import java.io._

import org.junit.runner.RunWith
import org.scalatest.{Matchers, FunSuite}
import org.scalatest.junit.JUnitRunner


@RunWith(classOf[JUnitRunner])
class ScalaShellLocalStartupITCase extends FunSuite with Matchers {

/**
* tests flink shell with local setup through startup script in bin folder
*/
test("start flink scala shell with local cluster") {

val input: String = "val els = env.fromElements(\"a\",\"b\");\n" + "els.print\nError\n:q\n"
val in: BufferedReader = new BufferedReader(new StringReader(input + "\n"))
val out: StringWriter = new StringWriter
val baos: ByteArrayOutputStream = new ByteArrayOutputStream
val oldOut: PrintStream = System.out
System.setOut(new PrintStream(baos))
val args: Array[String] = Array("local")

//start flink scala shell
FlinkShell.bufferedReader = in;
FlinkShell.main(args)


baos.flush
val output: String = baos.toString
System.setOut(oldOut)

assert((!output.contains("Error")))
assert((!output.contains("ERROR")))
assert((!output.contains("Exception")))
assert((!output.contains("failed")))
}
}

0 comments on commit 135ca75

Please sign in to comment.