From bf743a134bcfe46be5c561e4f81ffc5f5636fb19 Mon Sep 17 00:00:00 2001 From: Sachin Goel Date: Fri, 25 Sep 2015 13:43:45 +0530 Subject: [PATCH] [FLINK-2761][scala-shell]Prevent creation of new environment in Scala Shell --- .../api/java/ScalaShellRemoteEnvironment.java | 11 +++++++++++ .../org.apache.flink/api/scala/FlinkILoop.scala | 1 + .../flink/api/scala/ScalaShellITSuite.scala | 16 +++++++++++++--- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java index a7dc7088b51dd..859c68670792e 100644 --- a/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java +++ b/flink-staging/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java @@ -84,4 +84,15 @@ public JobExecutionResult execute(String jobName) throws Exception { executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); return executor.executePlan(p); } + + public void setAsContext() { + ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { + @Override + public ExecutionEnvironment createExecutionEnvironment() { + throw new UnsupportedOperationException("Execution Environment is already defined" + + " for this shell."); + } + }; + initializeContextEnvironment(factory); + } } diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala index 2797e4bdb55ec..1e96ba3aff2ef 100644 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala +++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala @@ -54,6 +54,7 @@ class FlinkILoop( // remote environment private val remoteEnv: ScalaShellRemoteEnvironment = { val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) + remoteEnv.setAsContext() remoteEnv } diff --git a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala index e932cd2e11ab4..7648c50f8d087 100644 --- a/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala +++ b/flink-staging/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITSuite.scala @@ -33,6 +33,19 @@ import scala.tools.nsc.Settings @RunWith(classOf[JUnitRunner]) class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { + test("Prevent re-creation of environment") { + + val input: String = + """ + val env = ExecutionEnvironment.getExecutionEnvironment + """.stripMargin + + val output: String = processInShell(input) + + output should include("UnsupportedOperationException: Execution Environment is already " + + "defined for this shell") + } + test("Iteration test with iterative Pi example") { val input: String = @@ -224,9 +237,6 @@ class ScalaShellITSuite extends FunSuite with Matchers with BeforeAndAfterAll { false, false) - val clusterEnvironment = new TestEnvironment(cl, parallelism) - clusterEnvironment.setAsContext() - cluster = Some(cl) }