From 2284a1bd172dcc91b66d558de8a5e8fff67d1651 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 19 Apr 2016 15:04:37 +0200 Subject: [PATCH 1/3] [FLINK-3775] [shell] Load Flink configuration before forwarding it This commit makes sure that the GlobalConfiguration is loaded before the FlinkShell is started. --- .../scala/org/apache/flink/api/scala/FlinkShell.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 2c2fbb3dbf261..ad08b720ac4d0 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -167,6 +167,11 @@ object FlinkShell { def startShell(config: Config): Unit = { println("Starting Flink Shell:") + // load global configuration + val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv + val configDirectory = new File(confDirPath) + GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath) + val (repl, cluster) = try { val (host, port, cluster) = fetchConnectionInfo(config) val conf = cluster match { @@ -216,13 +221,11 @@ object FlinkShell { val jarPath = new Path("file://" + s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}") yarnClient.setLocalJarPath(jarPath) - - // load configuration + val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv val flinkConfiguration = GlobalConfiguration.getConfiguration val confFile = new File(confDirPath + File.separator + "flink-conf.yaml") val confPath = new Path(confFile.getAbsolutePath) - GlobalConfiguration.loadConfiguration(confDirPath) yarnClient.setFlinkConfiguration(flinkConfiguration) yarnClient.setConfigurationDirectory(confDirPath) yarnClient.setConfigurationFilePath(confPath) From 1d4744745474cca8ab07fbdcfae26ac2fbf13e7a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 25 Apr 2016 14:42:47 +0200 Subject: [PATCH 2/3] Add configDir option to FlinkShell This allows to configure a configuration directory for the FlinkShell. If the CLI option is not set, then the system tries to find the configuration directory using first the FLINK_CONF_DIR environment variable and then the standard directories. --- .../apache/flink/api/scala/FlinkShell.scala | 47 ++++++++++++------- .../src/test/resources/flink-conf.yaml | 0 .../flink/api/scala/ScalaShellITCase.scala | 9 +++- .../scala/ScalaShellLocalStartupITCase.scala | 18 ++++--- 4 files changed, 49 insertions(+), 25 deletions(-) create mode 100644 flink-scala-shell/src/test/resources/flink-conf.yaml diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index ad08b720ac4d0..2618b096279fb 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -42,7 +42,8 @@ object FlinkShell { port: Option[Int] = None, externalJars: Option[Array[String]] = None, executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED, - yarnConfig: Option[YarnConfig] = None + yarnConfig: Option[YarnConfig] = None, + configDir: Option[String] = None ) /** YARN configuration object */ @@ -64,62 +65,68 @@ object FlinkShell { cmd("local") action { (_, c) => c.copy(executionMode = ExecutionMode.LOCAL) - } text("Starts Flink scala shell with a local Flink cluster") children( + } text "Starts Flink scala shell with a local Flink cluster" children( opt[(String)] ("addclasspath") abbr("a") valueName("") action { case (x, c) => val xArray = x.split(":") c.copy(externalJars = Option(xArray)) - } text("Specifies additional jars to be used in Flink") + } text "Specifies additional jars to be used in Flink" ) cmd("remote") action { (_, c) => c.copy(executionMode = ExecutionMode.REMOTE) - } text("Starts Flink scala shell connecting to a remote cluster") children( + } text "Starts Flink scala shell connecting to a remote cluster" children( arg[String]("") action { (h, c) => c.copy(host = Some(h)) } - text("Remote host name as string"), + text "Remote host name as string", arg[Int]("") action { (p, c) => c.copy(port = Some(p)) } - text("Remote port as integer\n"), - opt[(String)]("addclasspath") abbr("a") valueName("") action { + text "Remote port as integer\n", + opt[String]("addclasspath") abbr("a") valueName("") action { case (x, c) => val xArray = x.split(":") c.copy(externalJars = Option(xArray)) - } text ("Specifies additional jars to be used in Flink") + } text "Specifies additional jars to be used in Flink" ) cmd("yarn") action { (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = None) - } text ("Starts Flink scala shell connecting to a yarn cluster") children( + } text "Starts Flink scala shell connecting to a yarn cluster" children( opt[Int]("container") abbr ("n") valueName ("arg") action { (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = Some(x)))) - } text ("Number of YARN container to allocate (= Number of TaskManagers)"), + } text "Number of YARN container to allocate (= Number of TaskManagers)", opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action { (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x)))) - } text ("Memory for JobManager container [in MB]"), + } text "Memory for JobManager container [in MB]", opt[String]("name") abbr ("nm") action { (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name = Some(x)))) - } text ("Set a custom name for the application on YARN"), + } text "Set a custom name for the application on YARN", opt[String]("queue") abbr ("qu") valueName ("") action { (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(queue = Some(x)))) - } text ("Specifies YARN queue"), + } text "Specifies YARN queue", opt[Int]("slots") abbr ("s") valueName ("") action { (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(slots = Some(x)))) - } text ("Number of slots per TaskManager"), + } text "Number of slots per TaskManager", opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") action { (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x)))) - } text ("Memory per TaskManager container [in MB]"), + } text "Memory per TaskManager container [in MB]", opt[(String)] ("addclasspath") abbr("a") valueName("") action { case (x, c) => val xArray = x.split(":") c.copy(externalJars = Option(xArray)) - } text("Specifies additional jars to be used in Flink") + } text "Specifies additional jars to be used in Flink" ) - help("help") abbr ("h") text ("Prints this usage text") + opt[String]("configDir").optional().action { + (arg, conf) => conf.copy(configDir = Option(arg)) + } text { + "The configuration directory." + } + + help("help") abbr ("h") text "Prints this usage text" } // parse arguments @@ -168,7 +175,11 @@ object FlinkShell { println("Starting Flink Shell:") // load global configuration - val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv + val confDirPath = config.configDir match { + case Some(confDir) => confDir + case None => CliFrontend.getConfigurationDirectoryFromEnv + } + val configDirectory = new File(confDirPath) GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath) diff --git a/flink-scala-shell/src/test/resources/flink-conf.yaml b/flink-scala-shell/src/test/resources/flink-conf.yaml new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index 6642cff215a44..6effce71fa886 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -287,11 +287,18 @@ class ScalaShellITCase extends TestLogger { val oldOut: PrintStream = System.out System.setOut(new PrintStream(baos)) + val confFile: String = classOf[ScalaShellLocalStartupITCase] + .getResource("/flink-conf.yaml") + .getFile + val confDir = new File(confFile).getAbsoluteFile.getParent + val (c, args) = cluster match{ case Some(cl) => val arg = Array("remote", cl.hostname, - Integer.toString(cl.getLeaderRPCPort)) + Integer.toString(cl.getLeaderRPCPort), + "--configDir", + confDir) (cl, arg) case None => throw new AssertionError("Cluster creation failed.") diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala index 0e7dd5608c368..6f44bfe15376f 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala @@ -71,12 +71,18 @@ class ScalaShellLocalStartupITCase extends TestLogger { | |:q """.stripMargin - val in: BufferedReader = new BufferedReader(new StringReader(input + "\n")) - val out: StringWriter = new StringWriter - val baos: ByteArrayOutputStream = new ByteArrayOutputStream - val oldOut: PrintStream = System.out - System.setOut(new PrintStream(baos)) - val args: Array[String] = Array("local") + val in: BufferedReader = new BufferedReader(new StringReader(input + "\n")) + val out: StringWriter = new StringWriter + val baos: ByteArrayOutputStream = new ByteArrayOutputStream + val oldOut: PrintStream = System.out + System.setOut(new PrintStream(baos)) + + val confFile: String = classOf[ScalaShellLocalStartupITCase] + .getResource("/flink-conf.yaml") + .getFile + val confDir = new File(confFile).getAbsoluteFile.getParent + + val args: Array[String] = Array("local", "--configDir", confDir) //start flink scala shell FlinkShell.bufferedReader = Some(in); From 7c76ebd5b28fa39f996f2aab5a09a512967ed635 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 25 Apr 2016 15:35:20 +0200 Subject: [PATCH 3/3] Add Apache license header to dummy flink-conf.yaml file --- .../src/test/resources/flink-conf.yaml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/flink-scala-shell/src/test/resources/flink-conf.yaml b/flink-scala-shell/src/test/resources/flink-conf.yaml index e69de29bb2d1d..65b48d4d79b4e 100644 --- a/flink-scala-shell/src/test/resources/flink-conf.yaml +++ b/flink-scala-shell/src/test/resources/flink-conf.yaml @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################