From 37c9a466720a5370aeb09d4ff51ce6f241fc7f16 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 7 Jun 2018 17:47:32 +0800 Subject: [PATCH 1/2] [FLINK-9554][scala-shell] flink scala shell doesn't work in yarn mode --- .../org/apache/flink/api/scala/FlinkShell.scala | 17 +++++++++++++++-- .../start-script/start-scala-shell.sh | 7 +++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index b74a8a0d62ee7..cd8f5c5695758 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -20,6 +20,7 @@ package org.apache.flink.api.scala import java.io._ +import org.apache.commons.cli.{CommandLine, Options} import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser} import org.apache.flink.client.deployment.ClusterDescriptor import org.apache.flink.client.program.ClusterClient @@ -28,6 +29,7 @@ import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster} import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ @@ -273,14 +275,25 @@ object FlinkShell { yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString)) yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString)) + val customCommandLines = CliFrontend.loadCustomCommandLines( + configuration,configurationDirectory) + val commandOptions = CliFrontendParser.getRunCommandOptions + val customCommandLineOptions = new Options() + customCommandLines.asScala.foreach(cmd => { + cmd.addGeneralOptions(customCommandLineOptions) + cmd.addRunOptions(customCommandLineOptions) + }) + val commandLineOptions = CliFrontendParser.mergeOptions( + commandOptions, customCommandLineOptions) + val commandLine = CliFrontendParser.parse( - CliFrontendParser.getRunCommandOptions, + commandLineOptions, args.toArray, true) val frontend = new CliFrontend( configuration, - CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) + customCommandLines) val customCLI = frontend.getActiveCustomCommandLine(commandLine) val clusterDescriptor = customCLI.createClusterDescriptor(commandLine) diff --git a/flink-scala-shell/start-script/start-scala-shell.sh b/flink-scala-shell/start-script/start-scala-shell.sh index 033d505032229..e40eb6f9063c3 100644 --- a/flink-scala-shell/start-script/start-scala-shell.sh +++ b/flink-scala-shell/start-script/start-scala-shell.sh @@ -19,6 +19,9 @@ # from scala-lang 2.10.4 +# Uncomment the following line to enable remote debug +# export FLINK_SCALA_SHELL_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005" + # restore stty settings (echo in particular) function restoreSttySettings() { if [[ -n $SCALA_RUNNER_DEBUG ]]; then @@ -86,9 +89,9 @@ fi if ${EXTERNAL_LIB_FOUND} then - java -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" + java -Dscala.color $FLINK_SCALA_SHELL_JAVA_OPTS -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" else - java -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ + java -Dscala.color $FLINK_SCALA_SHELL_JAVA_OPTS -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ fi #restore echo From cdee42eba755ad44ec6027104f41cbf05ed864d6 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 3 Jul 2018 17:57:42 +0800 Subject: [PATCH 2/2] Address feedback --- .../apache/flink/client/cli/CliFrontend.java | 4 ++++ .../apache/flink/api/scala/FlinkShell.scala | 23 +++++-------------- .../start-script/start-scala-shell.sh | 7 ++---- 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index e9a15900387e9..e2a260c5478ba 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -166,6 +166,10 @@ public Configuration getConfiguration() { return copiedConfiguration; } + public Options getCustomCommandLineOptions() { + return customCommandLineOptions; + } + // -------------------------------------------------------------------------------------------- // Execute Actions // -------------------------------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index cd8f5c5695758..ae22d87f6cde1 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -275,25 +275,14 @@ object FlinkShell { yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString)) yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString)) - val customCommandLines = CliFrontend.loadCustomCommandLines( - configuration,configurationDirectory) - val commandOptions = CliFrontendParser.getRunCommandOptions - val customCommandLineOptions = new Options() - customCommandLines.asScala.foreach(cmd => { - cmd.addGeneralOptions(customCommandLineOptions) - cmd.addRunOptions(customCommandLineOptions) - }) - val commandLineOptions = CliFrontendParser.mergeOptions( - commandOptions, customCommandLineOptions) + val frontend = new CliFrontend(configuration, + CliFrontend.loadCustomCommandLines(configuration, configurationDirectory)) - val commandLine = CliFrontendParser.parse( - commandLineOptions, - args.toArray, - true) + val commandOptions = CliFrontendParser.getRunCommandOptions + val commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, + frontend.getCustomCommandLineOptions()); + val commandLine = CliFrontendParser.parse(commandLineOptions, args.toArray, true) - val frontend = new CliFrontend( - configuration, - customCommandLines) val customCLI = frontend.getActiveCustomCommandLine(commandLine) val clusterDescriptor = customCLI.createClusterDescriptor(commandLine) diff --git a/flink-scala-shell/start-script/start-scala-shell.sh b/flink-scala-shell/start-script/start-scala-shell.sh index e40eb6f9063c3..033d505032229 100644 --- a/flink-scala-shell/start-script/start-scala-shell.sh +++ b/flink-scala-shell/start-script/start-scala-shell.sh @@ -19,9 +19,6 @@ # from scala-lang 2.10.4 -# Uncomment the following line to enable remote debug -# export FLINK_SCALA_SHELL_JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005" - # restore stty settings (echo in particular) function restoreSttySettings() { if [[ -n $SCALA_RUNNER_DEBUG ]]; then @@ -89,9 +86,9 @@ fi if ${EXTERNAL_LIB_FOUND} then - java -Dscala.color $FLINK_SCALA_SHELL_JAVA_OPTS -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" + java -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" else - java -Dscala.color $FLINK_SCALA_SHELL_JAVA_OPTS -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ + java -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ fi #restore echo