From 58a9f2618f3563827200697c956d72269d3364fd Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Thu, 4 Jun 2015 10:16:07 -0700 Subject: [PATCH 1/9] Send app secret to executors via pipes --- .../org/apache/spark/SecurityManager.scala | 14 ++++++-- .../scala/org/apache/spark/SparkConf.scala | 28 +++++++++++++++ .../spark/deploy/ApplicationDescription.scala | 5 ++- .../org/apache/spark/deploy/Client.scala | 5 +-- .../spark/deploy/DriverDescription.scala | 8 +++-- .../deploy/rest/StandaloneRestServer.scala | 9 +++-- .../spark/deploy/worker/DriverRunner.scala | 11 ++++-- .../spark/deploy/worker/DriverWrapper.scala | 1 + .../spark/deploy/worker/ExecutorRunner.scala | 2 ++ .../CoarseGrainedExecutorBackend.scala | 1 + .../cluster/SparkDeploySchedulerBackend.scala | 6 ++-- .../scala/org/apache/spark/util/Utils.scala | 34 +++++++++++++++++++ .../spark/deploy/master/MasterSuite.scala | 1 + 13 files changed, 109 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 8aed1e20e0686..4710ae6fcdeb8 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -188,11 +188,12 @@ import org.apache.spark.util.Utils private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder { + import SecurityManager._ // key used to store the spark secret in the Hadoop UGI private val sparkSecretLookupKey = "sparkCookie" - private val authOn = sparkConf.getBoolean("spark.authenticate", false) + private val authOn = sparkConf.isAuthOn // keep spark.ui.acls.enable for backwards compatibility with 1.0 private var aclsOn = sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false)) @@ -365,10 +366,10 @@ private[spark] class SecurityManager(sparkConf: SparkConf) cookie } else { // user must have set spark.authenticate.secret config - sparkConf.getOption("spark.authenticate.secret") match { + sparkConf.getAuthSecret match { case Some(value) => value case None => throw new Exception("Error: a secret key must be specified via the " + - "spark.authenticate.secret config") + AUTH_SECRET + " config") } } sCookie @@ -449,3 +450,10 @@ private[spark] class SecurityManager(sparkConf: SparkConf) override def getSaslUser(appId: String): String = getSaslUser() override def getSecretKey(appId: String): String = getSecretKey() } + +private[spark] object SecurityManager { + + val AUTH_CONFIG: String = "spark.authenticate" + val AUTH_SECRET: String = "spark.authenticate.secret" + +} diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 46d72841dccce..0684186e99708 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -333,6 +333,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { */ def getAppId: String = get("spark.app.id") + /** + * Check to see if authentication for the Spark communication protocols is enabled + * @return true if authentication is enabled, otherwise false + */ + private[spark] def isAuthOn: Boolean = getBoolean(SecurityManager.AUTH_CONFIG, false) + + /** + * Return the authentication key for standalone cluster managers (Master and Worker). + * + * In non-YARN deployments, this key is also used for authentication in apps (e.g., + * between driver and executor). See [[org.apache.spark.SecurityManager]] for details. + */ + private[spark] def getAuthSecret: Option[String] = getOption(SecurityManager.AUTH_SECRET) + /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.containsKey(key) @@ -562,6 +576,20 @@ private[spark] object SparkConf extends Logging { isSparkPortConf(name) } + /** + * Return true if the given config is NOT for authentication secret. + */ + private[spark] def isNotAuthSecretConf(name: String): Boolean = + name != SecurityManager.AUTH_SECRET + + /** + * Return whether the given config should be passed to an executor launched by standalone + * cluster manager as JVM system properties on start-up. In particular, authentication + * secret is filtered out since it will be written to executor's stdin. + */ + private[spark] def isStandaloneExecutorStartupConf(name: String): Boolean = + isExecutorStartupConf(name) && isNotAuthSecretConf(name) + /** * Return true if the given config matches either `spark.*.port` or `spark.port.*`. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index ae99432f5ce86..d748e686ff856 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -25,6 +25,7 @@ private[spark] class ApplicationDescription( val memoryPerExecutorMB: Int, val command: Command, var appUiUrl: String, + val appSecret: Option[String] = None, val eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) val eventLogCodec: Option[String] = None, @@ -39,10 +40,12 @@ private[spark] class ApplicationDescription( memoryPerExecutorMB: Int = memoryPerExecutorMB, command: Command = command, appUiUrl: String = appUiUrl, + appSecret: Option[String] = appSecret, eventLogDir: Option[URI] = eventLogDir, eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription = new ApplicationDescription( - name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec) + name, maxCores, memoryPerExecutorMB, command, appUiUrl, + appSecret, eventLogDir, eventLogCodec) override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 848b62f9de71b..3a4d5f211000d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -70,7 +70,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val extraJavaOptsConf = "spark.driver.extraJavaOptions" val extraJavaOpts = sys.props.get(extraJavaOptsConf) .map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotAuthSecretConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, @@ -81,7 +81,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.memory, driverArgs.cores, driverArgs.supervise, - command) + command, + conf.getAuthSecret) // app secret is cluster auth secret for now // This assumes only one Master is active at a time for (masterActor <- masterActors) { diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 659fb434a80f5..6788bfa21f40c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -22,7 +22,8 @@ private[deploy] class DriverDescription( val mem: Int, val cores: Int, val supervise: Boolean, - val command: Command) + val command: Command, + val appSecret: Option[String] = None) extends Serializable { def copy( @@ -30,8 +31,9 @@ private[deploy] class DriverDescription( mem: Int = mem, cores: Int = cores, supervise: Boolean = supervise, - command: Command = command): DriverDescription = - new DriverDescription(jarUrl, mem, cores, supervise, command) + command: Command = command, + appSecret: Option[String] = appSecret): DriverDescription = + new DriverDescription(jarUrl, mem, cores, supervise, command, appSecret) override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 502b9bb701ccf..c9e90dd3e26dc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -150,7 +150,7 @@ private[rest] class StandaloneSubmitRequestServlet( val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotAuthSecretConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command( "org.apache.spark.deploy.worker.DriverWrapper", @@ -160,7 +160,12 @@ private[rest] class StandaloneSubmitRequestServlet( val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) new DriverDescription( - appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command) + appResource, + actualDriverMemory, + actualDriverCores, + actualSuperviseDriver, + command, + conf.getAuthSecret) // app secret is cluster auth secret for now } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index ef7a703bffe67..3fde97802fb77 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -87,7 +87,7 @@ private[deploy] class DriverRunner( // TODO: If we add ability to submit multiple jars they should also be added here val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) - launchDriver(builder, driverDir, driverDesc.supervise) + launchDriver(builder, driverDir, driverDesc.supervise, driverDesc.appSecret, conf) } catch { case e: Exception => finalException = Some(e) @@ -164,9 +164,16 @@ private[deploy] class DriverRunner( localJarFilename } - private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) { + private def launchDriver( + builder: ProcessBuilder, + baseDir: File, + supervise: Boolean, + appSecret: Option[String], + sparkConf: SparkConf) { builder.directory(baseDir) def initialize(process: Process): Unit = { + // send app secret to driver via stdin if needed + Utils.pipeOutAppSecretIfNeeded(process, appSecret, sparkConf) // Redirect stdout and stderr to files val stdout = new File(baseDir, "stdout") CommandUtils.redirectStream(process.getInputStream, stdout) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index d1a12b01e78f7..fa0ebdc1795ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -38,6 +38,7 @@ object DriverWrapper { */ case workerUrl :: userJar :: mainClass :: extraArgs => val conf = new SparkConf() + Utils.setAndExportAppSecretIfNeeded(conf) val rpcEnv = RpcEnv.create("Driver", Utils.localHostName(), 0, conf, new SecurityManager(conf)) rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl)) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7aa85b732fc87..fde86261a4baa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -146,6 +146,8 @@ private[deploy] class ExecutorRunner( val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) + // send app secret to executor via stdin if needed + Utils.pipeOutAppSecretIfNeeded(process, appDesc.appSecret, conf) // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") stdoutAppender = FileAppender(process.getInputStream, stdout, conf) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f3a26f54a81fb..5c5093dfde634 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -149,6 +149,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf + Utils.setAndExportAppSecretIfNeeded(executorConf) val port = executorConf.getInt("spark.executor.port", 0) val fetcher = RpcEnv.create( "driverPropsFetcher", diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index ccf1dc5af6120..2c7e020644a32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -77,14 +77,14 @@ private[spark] class SparkDeploySchedulerBackend( } // Start executors with a few necessary configs for registering with the scheduler - val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isStandaloneExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, - command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) + val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,command, + appUIAddress, conf.getAuthSecret, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 693e1a0a3d5f0..9943fbc908bda 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -324,6 +324,40 @@ private[spark] object Utils extends Logging { dir } + /** + * If authentication is enabled, send app secret to process via its stdin. + */ + def pipeOutAppSecretIfNeeded( + process: Process, + secret: Option[String], + conf: SparkConf): Unit = + for (value <- secret if conf.isAuthOn) { + val out = new BufferedOutputStream(process.getOutputStream) + out.write(value.getBytes) + out.close + } + + /** + * If authentication is enabled, retrieve app secret from stdin, set it in conf and + * export it to system properties. + * + * Note: This mutates state in the given SparkConf and in this JVM's system properties. + */ + def setAndExportAppSecretIfNeeded(conf: SparkConf): Unit = { + if (conf.isAuthOn) { + val in = new BufferedReader(new InputStreamReader(System.in)) + Option(in.readLine) match { + case Some(value) => + conf.set(SecurityManager.AUTH_SECRET, value) + sys.props.update(SecurityManager.AUTH_SECRET, value) + + case None => throw new Exception("Error: authentication is enabled but " + + "failed to obtain authentication key from stdin") + } + in.close + } + } + /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream * copying is disabled by default unless explicitly set transferToEnabled as true, * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 014e87bb40254..e95eaa09b6c0c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -102,6 +102,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually { memoryPerExecutorMB = 0, command = commandToPersist, appUiUrl = "", + appSecret = None, eventLogDir = None, eventLogCodec = None, coresPerExecutor = None), From 5d333d954357a94961275ea873e4319e86ab47ff Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Thu, 4 Jun 2015 12:04:02 -0700 Subject: [PATCH 2/9] minor refactoring --- .../org/apache/spark/SecurityManager.scala | 10 ++++---- .../scala/org/apache/spark/SparkConf.scala | 24 ++++++++++++------- .../org/apache/spark/deploy/Client.scala | 4 ++-- .../deploy/rest/StandaloneRestServer.scala | 4 ++-- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 8 +++---- 6 files changed, 29 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 4710ae6fcdeb8..3dbc6e81cbd00 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -193,7 +193,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) // key used to store the spark secret in the Hadoop UGI private val sparkSecretLookupKey = "sparkCookie" - private val authOn = sparkConf.isAuthOn + private val authOn = sparkConf.authOn // keep spark.ui.acls.enable for backwards compatibility with 1.0 private var aclsOn = sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false)) @@ -366,10 +366,10 @@ private[spark] class SecurityManager(sparkConf: SparkConf) cookie } else { // user must have set spark.authenticate.secret config - sparkConf.getAuthSecret match { + sparkConf.getClusterAuthSecret match { case Some(value) => value case None => throw new Exception("Error: a secret key must be specified via the " + - AUTH_SECRET + " config") + CLUSTER_AUTH_SECRET_CONF + " config") } } sCookie @@ -453,7 +453,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) private[spark] object SecurityManager { - val AUTH_CONFIG: String = "spark.authenticate" - val AUTH_SECRET: String = "spark.authenticate.secret" + val CLUSTER_AUTH_CONF: String = "spark.authenticate" + val CLUSTER_AUTH_SECRET_CONF: String = "spark.authenticate.secret" } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0684186e99708..c139a87fc9abb 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -337,7 +337,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * Check to see if authentication for the Spark communication protocols is enabled * @return true if authentication is enabled, otherwise false */ - private[spark] def isAuthOn: Boolean = getBoolean(SecurityManager.AUTH_CONFIG, false) + private[spark] def authOn: Boolean = { + getBoolean(SecurityManager.CLUSTER_AUTH_CONF, false) + } /** * Return the authentication key for standalone cluster managers (Master and Worker). @@ -345,7 +347,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * In non-YARN deployments, this key is also used for authentication in apps (e.g., * between driver and executor). See [[org.apache.spark.SecurityManager]] for details. */ - private[spark] def getAuthSecret: Option[String] = getOption(SecurityManager.AUTH_SECRET) + private[spark] def getClusterAuthSecret: Option[String] = { + getOption(SecurityManager.CLUSTER_AUTH_SECRET_CONF) + } /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.containsKey(key) @@ -577,18 +581,20 @@ private[spark] object SparkConf extends Logging { } /** - * Return true if the given config is NOT for authentication secret. + * Return true if the given config is NOT for cluster authentication secret. */ - private[spark] def isNotAuthSecretConf(name: String): Boolean = - name != SecurityManager.AUTH_SECRET + private[spark] def isNotClusterAuthSecretConf(name: String): Boolean = { + name != SecurityManager.CLUSTER_AUTH_SECRET_CONF + } /** * Return whether the given config should be passed to an executor launched by standalone - * cluster manager as JVM system properties on start-up. In particular, authentication - * secret is filtered out since it will be written to executor's stdin. + * cluster manager as JVM system properties on start-up. In particular, cluster + * authentication secret is filtered out since it will be written to executor's stdin. */ - private[spark] def isStandaloneExecutorStartupConf(name: String): Boolean = - isExecutorStartupConf(name) && isNotAuthSecretConf(name) + private[spark] def isStandaloneExecutorStartupConf(name: String): Boolean = { + isExecutorStartupConf(name) && isNotClusterAuthSecretConf(name) + } /** * Return true if the given config matches either `spark.*.port` or `spark.port.*`. diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 3a4d5f211000d..11f587a0cd168 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -70,7 +70,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val extraJavaOptsConf = "spark.driver.extraJavaOptions" val extraJavaOpts = sys.props.get(extraJavaOptsConf) .map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotAuthSecretConf) + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotClusterAuthSecretConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, @@ -82,7 +82,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.cores, driverArgs.supervise, command, - conf.getAuthSecret) // app secret is cluster auth secret for now + conf.getClusterAuthSecret) // app secret is cluster auth secret for now // This assumes only one Master is active at a time for (masterActor <- masterActors) { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index c9e90dd3e26dc..ec810bb630deb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -150,7 +150,7 @@ private[rest] class StandaloneSubmitRequestServlet( val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotAuthSecretConf) + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotClusterAuthSecretConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command( "org.apache.spark.deploy.worker.DriverWrapper", @@ -165,7 +165,7 @@ private[rest] class StandaloneSubmitRequestServlet( actualDriverCores, actualSuperviseDriver, command, - conf.getAuthSecret) // app secret is cluster auth secret for now + conf.getClusterAuthSecret) // app secret is cluster auth secret for now } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 2c7e020644a32..0ffb852e60f93 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -84,7 +84,7 @@ private[spark] class SparkDeploySchedulerBackend( val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,command, - appUIAddress, conf.getAuthSecret, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) + appUIAddress, conf.getClusterAuthSecret, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9943fbc908bda..06a6ccc677c4a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -331,7 +331,7 @@ private[spark] object Utils extends Logging { process: Process, secret: Option[String], conf: SparkConf): Unit = - for (value <- secret if conf.isAuthOn) { + for (value <- secret if conf.authOn) { val out = new BufferedOutputStream(process.getOutputStream) out.write(value.getBytes) out.close @@ -344,12 +344,12 @@ private[spark] object Utils extends Logging { * Note: This mutates state in the given SparkConf and in this JVM's system properties. */ def setAndExportAppSecretIfNeeded(conf: SparkConf): Unit = { - if (conf.isAuthOn) { + if (conf.authOn) { val in = new BufferedReader(new InputStreamReader(System.in)) Option(in.readLine) match { case Some(value) => - conf.set(SecurityManager.AUTH_SECRET, value) - sys.props.update(SecurityManager.AUTH_SECRET, value) + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, value) + sys.props.update(SecurityManager.CLUSTER_AUTH_SECRET_CONF, value) case None => throw new Exception("Error: authentication is enabled but " + "failed to obtain authentication key from stdin") From 7404ab35ec6c599ddb59ac72c816d8da83f92e6b Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Thu, 4 Jun 2015 15:59:17 -0700 Subject: [PATCH 3/9] adding StandaloneRestServer test --- .../scala/org/apache/spark/SparkConf.scala | 4 +- .../org/apache/spark/deploy/Client.scala | 2 +- .../deploy/rest/StandaloneRestServer.scala | 4 +- .../cluster/SparkDeploySchedulerBackend.scala | 12 +++- .../rest/StandaloneRestServerSuite.scala | 72 +++++++++++++++++++ 5 files changed, 87 insertions(+), 7 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index c139a87fc9abb..a45d13c394baf 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -589,8 +589,8 @@ private[spark] object SparkConf extends Logging { /** * Return whether the given config should be passed to an executor launched by standalone - * cluster manager as JVM system properties on start-up. In particular, cluster - * authentication secret is filtered out since it will be written to executor's stdin. + * cluster manager as java options on start-up. In particular, cluster authentication + * secret is filtered out since it will be written to executor's stdin. */ private[spark] def isStandaloneExecutorStartupConf(name: String): Boolean = { isExecutorStartupConf(name) && isNotClusterAuthSecretConf(name) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 11f587a0cd168..a26a8997d8e7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -82,7 +82,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) driverArgs.cores, driverArgs.supervise, command, - conf.getClusterAuthSecret) // app secret is cluster auth secret for now + if (conf.authOn) conf.getClusterAuthSecret else None) // This assumes only one Master is active at a time for (masterActor <- masterActors) { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index ec810bb630deb..c3eaa823a02d1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -123,7 +123,7 @@ private[rest] class StandaloneSubmitRequestServlet( * fields used by python applications since python is not supported in standalone * cluster mode yet. */ - private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = { + private[rest] def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = { // Required fields, including the main class because python is not yet supported val appResource = Option(request.appResource).getOrElse { throw new SubmitRestMissingFieldException("Application jar is missing.") @@ -165,7 +165,7 @@ private[rest] class StandaloneSubmitRequestServlet( actualDriverCores, actualSuperviseDriver, command, - conf.getClusterAuthSecret) // app secret is cluster auth secret for now + if (conf.authOn) conf.getClusterAuthSecret else None) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 0ffb852e60f93..dbb24893501d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -83,8 +83,16 @@ private[spark] class SparkDeploySchedulerBackend( args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,command, - appUIAddress, conf.getClusterAuthSecret, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) + val appDesc = new ApplicationDescription( + sc.appName, + maxCores, + sc.executorMemory, + command, + appUIAddress, + if (conf.authOn) conf.getClusterAuthSecret else None, + sc.eventLogDir, + sc.eventLogCodec, + coresPerExecutor) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala new file mode 100644 index 0000000000000..ea758ff15f5fd --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} + +/** + * Tests for the Standalone REST server. + */ +class StandaloneRestServerSuite extends SparkFunSuite { + + test("Auth secret shouldn't appear on the command line") { + val servlet = new StandaloneSubmitRequestServlet(null , "", null) + val request = new CreateSubmissionRequest + request.clientSparkVersion = "1.2.3" + request.appResource = "honey-walnut-cherry.jar" + request.mainClass = "org.apache.spark.examples.SparkPie" + request.appArgs = Array("two slices", "a hint of cinnamon") + val conf = new SparkConf(false) + conf.set("spark.app.name", "SparkPie") + request.sparkProperties = conf.getAll.toMap + request.validate() + + // optional fields + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + request.sparkProperties = conf.getAll.toMap + request.validate() + var driver = servlet.buildDriverDescription(request) + assert(driver.appSecret === None) + assert(!driver.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_CONF))) + assert(!driver.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + + conf.set(SecurityManager.CLUSTER_AUTH_CONF, "false") + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + request.sparkProperties = conf.getAll.toMap + request.validate() + driver = servlet.buildDriverDescription(request) + assert(driver.appSecret === None) + assert(driver.command.javaOpts.contains( + "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=false")) + assert(!driver.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + + conf.set(SecurityManager.CLUSTER_AUTH_CONF, "true") + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + request.sparkProperties = conf.getAll.toMap + request.validate() + driver = servlet.buildDriverDescription(request) + assert(driver.appSecret === Some("This is the secret sauce")) + assert(driver.command.javaOpts.contains( + "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=true")) + assert(!driver.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + } +} From 9f047c44a8ff63483a85ec82cb0b598294c3e5cc Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Thu, 4 Jun 2015 17:06:00 -0700 Subject: [PATCH 4/9] use PrivateMethodTester --- .../spark/deploy/rest/StandaloneRestServer.scala | 2 +- .../deploy/rest/StandaloneRestServerSuite.scala | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index c3eaa823a02d1..85303965bcb21 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -123,7 +123,7 @@ private[rest] class StandaloneSubmitRequestServlet( * fields used by python applications since python is not supported in standalone * cluster mode yet. */ - private[rest] def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = { + private def buildDriverDescription(request: CreateSubmissionRequest): DriverDescription = { // Required fields, including the main class because python is not yet supported val appResource = Option(request.appResource).getOrElse { throw new SubmitRestMissingFieldException("Application jar is missing.") diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala index ea758ff15f5fd..330d4ed686b53 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.deploy.rest +import org.scalatest.PrivateMethodTester + import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.DriverDescription /** * Tests for the Standalone REST server. */ -class StandaloneRestServerSuite extends SparkFunSuite { +class StandaloneRestServerSuite extends SparkFunSuite with PrivateMethodTester { test("Auth secret shouldn't appear on the command line") { val servlet = new StandaloneSubmitRequestServlet(null , "", null) + val buildDriverDesc = PrivateMethod[DriverDescription]('buildDriverDescription) val request = new CreateSubmissionRequest request.clientSparkVersion = "1.2.3" request.appResource = "honey-walnut-cherry.jar" @@ -40,7 +44,7 @@ class StandaloneRestServerSuite extends SparkFunSuite { conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") request.sparkProperties = conf.getAll.toMap request.validate() - var driver = servlet.buildDriverDescription(request) + var driver = servlet invokePrivate buildDriverDesc(request) assert(driver.appSecret === None) assert(!driver.command.javaOpts.exists( _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_CONF))) @@ -51,7 +55,7 @@ class StandaloneRestServerSuite extends SparkFunSuite { conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") request.sparkProperties = conf.getAll.toMap request.validate() - driver = servlet.buildDriverDescription(request) + driver = servlet invokePrivate buildDriverDesc(request) assert(driver.appSecret === None) assert(driver.command.javaOpts.contains( "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=false")) @@ -62,7 +66,7 @@ class StandaloneRestServerSuite extends SparkFunSuite { conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") request.sparkProperties = conf.getAll.toMap request.validate() - driver = servlet.buildDriverDescription(request) + driver = servlet invokePrivate buildDriverDesc(request) assert(driver.appSecret === Some("This is the secret sauce")) assert(driver.command.javaOpts.contains( "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=true")) From 3f78b261af266e150abad9922418eb7b16d4eee8 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Thu, 4 Jun 2015 18:31:09 -0700 Subject: [PATCH 5/9] test refactoring --- .../spark/deploy/rest/StandaloneRestServerSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala index 330d4ed686b53..5564c3f3c8d73 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala @@ -40,10 +40,11 @@ class StandaloneRestServerSuite extends SparkFunSuite with PrivateMethodTester { request.sparkProperties = conf.getAll.toMap request.validate() - // optional fields + // set secret conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + + // auth is not set request.sparkProperties = conf.getAll.toMap - request.validate() var driver = servlet invokePrivate buildDriverDesc(request) assert(driver.appSecret === None) assert(!driver.command.javaOpts.exists( @@ -51,10 +52,9 @@ class StandaloneRestServerSuite extends SparkFunSuite with PrivateMethodTester { assert(!driver.command.javaOpts.exists( _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + // auth is set to false conf.set(SecurityManager.CLUSTER_AUTH_CONF, "false") - conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") request.sparkProperties = conf.getAll.toMap - request.validate() driver = servlet invokePrivate buildDriverDesc(request) assert(driver.appSecret === None) assert(driver.command.javaOpts.contains( @@ -62,10 +62,9 @@ class StandaloneRestServerSuite extends SparkFunSuite with PrivateMethodTester { assert(!driver.command.javaOpts.exists( _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + // auth is set to true conf.set(SecurityManager.CLUSTER_AUTH_CONF, "true") - conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") request.sparkProperties = conf.getAll.toMap - request.validate() driver = servlet invokePrivate buildDriverDesc(request) assert(driver.appSecret === Some("This is the secret sauce")) assert(driver.command.javaOpts.contains( From e535d3dfd927f479d4a495913c7288d0e3c2ef52 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Fri, 5 Jun 2015 10:56:34 -0700 Subject: [PATCH 6/9] Add SparkDeploySchedulerBackend test --- .../cluster/SparkDeploySchedulerBackend.scala | 92 ++++++++++--------- .../rest/StandaloneRestServerSuite.scala | 26 +++--- .../SparkDeploySchedulerBackendSuite.scala | 70 ++++++++++++++ 3 files changed, 134 insertions(+), 54 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index dbb24893501d4..abe79a2f9fbf7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -33,6 +33,7 @@ private[spark] class SparkDeploySchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with AppClientListener with Logging { + import SparkDeploySchedulerBackend._ private var client: AppClient = null private var stopping = false @@ -52,47 +53,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName, RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt), CoarseGrainedSchedulerBackend.ENDPOINT_NAME) - val args = Seq( - "--driver-url", driverUrl, - "--executor-id", "{{EXECUTOR_ID}}", - "--hostname", "{{HOSTNAME}}", - "--cores", "{{CORES}}", - "--app-id", "{{APP_ID}}", - "--worker-url", "{{WORKER_URL}}") - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") - .map(Utils.splitCommandString).getOrElse(Seq.empty) - val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") - .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) - val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") - .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) - - // When testing, expose the parent class path to the child. This is processed by - // compute-classpath.{cmd,sh} and makes all needed jars available to child processes - // when the assembly is built with the "*-provided" profiles enabled. - val testingClassPath = - if (sys.props.contains("spark.testing")) { - sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq - } else { - Nil - } - - // Start executors with a few necessary configs for registering with the scheduler - val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isStandaloneExecutorStartupConf) - val javaOpts = sparkJavaOpts ++ extraJavaOpts - val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", - args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) - val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) - val appDesc = new ApplicationDescription( - sc.appName, - maxCores, - sc.executorMemory, - command, - appUIAddress, - if (conf.authOn) conf.getClusterAuthSecret else None, - sc.eventLogDir, - sc.eventLogCodec, - coresPerExecutor) + val appDesc = buildAppDescription(driverUrl, maxCores, sc) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() waitForRegistration() @@ -169,3 +130,52 @@ private[spark] class SparkDeploySchedulerBackend( } } + +private[spark] object SparkDeploySchedulerBackend { + + // Exposed for testing + private def buildAppDescription(driverUrl: String, maxCores: Option[Int], + sc: SparkContext): ApplicationDescription = { + val args = Seq( + "--driver-url", driverUrl, + "--executor-id", "{{EXECUTOR_ID}}", + "--hostname", "{{HOSTNAME}}", + "--cores", "{{CORES}}", + "--app-id", "{{APP_ID}}", + "--worker-url", "{{WORKER_URL}}") + val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") + .map(Utils.splitCommandString).getOrElse(Seq.empty) + val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") + .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) + val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") + .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) + + // When testing, expose the parent class path to the child. This is processed by + // compute-classpath.{cmd,sh} and makes all needed jars available to child processes + // when the assembly is built with the "*-provided" profiles enabled. + val testingClassPath = + if (sys.props.contains("spark.testing")) { + sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq + } else { + Nil + } + + // Start executors with a few necessary configs for registering with the scheduler + val sparkJavaOpts = Utils.sparkJavaOpts(sc.conf, SparkConf.isStandaloneExecutorStartupConf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts + val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", + args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) + val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") + val coresPerExecutor = sc.conf.getOption("spark.executor.cores").map(_.toInt) + new ApplicationDescription( + sc.appName, + maxCores, + sc.executorMemory, + command, + appUIAddress, + if (sc.conf.authOn) sc.conf.getClusterAuthSecret else None, + sc.eventLogDir, + sc.eventLogCodec, + coresPerExecutor) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala index 5564c3f3c8d73..5f71d1839b465 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestServerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.deploy.DriverDescription */ class StandaloneRestServerSuite extends SparkFunSuite with PrivateMethodTester { - test("Auth secret shouldn't appear on the command line") { + test("Auth secret shouldn't appear in the command") { val servlet = new StandaloneSubmitRequestServlet(null , "", null) val buildDriverDesc = PrivateMethod[DriverDescription]('buildDriverDescription) val request = new CreateSubmissionRequest @@ -45,31 +45,31 @@ class StandaloneRestServerSuite extends SparkFunSuite with PrivateMethodTester { // auth is not set request.sparkProperties = conf.getAll.toMap - var driver = servlet invokePrivate buildDriverDesc(request) - assert(driver.appSecret === None) - assert(!driver.command.javaOpts.exists( + var driverDesc = servlet invokePrivate buildDriverDesc(request) + assert(driverDesc.appSecret === None) + assert(!driverDesc.command.javaOpts.exists( _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_CONF))) - assert(!driver.command.javaOpts.exists( + assert(!driverDesc.command.javaOpts.exists( _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) // auth is set to false conf.set(SecurityManager.CLUSTER_AUTH_CONF, "false") request.sparkProperties = conf.getAll.toMap - driver = servlet invokePrivate buildDriverDesc(request) - assert(driver.appSecret === None) - assert(driver.command.javaOpts.contains( + driverDesc = servlet invokePrivate buildDriverDesc(request) + assert(driverDesc.appSecret === None) + assert(driverDesc.command.javaOpts.contains( "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=false")) - assert(!driver.command.javaOpts.exists( + assert(!driverDesc.command.javaOpts.exists( _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) // auth is set to true conf.set(SecurityManager.CLUSTER_AUTH_CONF, "true") request.sparkProperties = conf.getAll.toMap - driver = servlet invokePrivate buildDriverDesc(request) - assert(driver.appSecret === Some("This is the secret sauce")) - assert(driver.command.javaOpts.contains( + driverDesc = servlet invokePrivate buildDriverDesc(request) + assert(driverDesc.appSecret === Some("This is the secret sauce")) + assert(driverDesc.command.javaOpts.contains( "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=true")) - assert(!driver.command.javaOpts.exists( + assert(!driverDesc.command.javaOpts.exists( _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackendSuite.scala new file mode 100644 index 0000000000000..d167104634247 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackendSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.scalatest.PrivateMethodTester + +import org.apache.spark._ +import org.apache.spark.deploy.ApplicationDescription + +class SparkDeploySchedulerBackendSuite + extends SparkFunSuite with LocalSparkContext with PrivateMethodTester { + + val buildAppDesc = PrivateMethod[ApplicationDescription]('buildAppDescription) + + test("Auth secret shouldn't appear in the command when auth is not set") { + val conf = new SparkConf + // always set secret + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + sc = new SparkContext("local", "test", conf) + val appDesc = SparkDeploySchedulerBackend invokePrivate buildAppDesc("", Option(1), sc) + assert(appDesc.appSecret === None) + assert(!appDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_CONF))) + assert(!appDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + } + + test("Auth secret shouldn't appear in the command when auth is set to false") { + val conf = new SparkConf + conf.set(SecurityManager.CLUSTER_AUTH_CONF, "false") + // always set secret + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + sc = new SparkContext("local", "test", conf) + val appDesc = SparkDeploySchedulerBackend invokePrivate buildAppDesc("", Option(1), sc) + assert(appDesc.appSecret === None) + assert(appDesc.command.javaOpts.contains( + "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=false")) + assert(!appDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + } + + test("Auth secret shouldn't appear in the command when auth is set to true") { + val conf = new SparkConf + conf.set(SecurityManager.CLUSTER_AUTH_CONF, "true") + // always set secret + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + sc = new SparkContext("local", "test", conf) + val appDesc = SparkDeploySchedulerBackend invokePrivate buildAppDesc("", Option(1), sc) + assert(appDesc.appSecret === Some("This is the secret sauce")) + assert(appDesc.command.javaOpts.contains( + "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=true")) + assert(!appDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + } +} From d05244ef03be899ec78e4ecb87a4f7ce6f67b025 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Fri, 5 Jun 2015 13:52:49 -0700 Subject: [PATCH 7/9] added ClientSuite test --- .../org/apache/spark/deploy/Client.scala | 66 +++++++++++-------- .../org/apache/spark/deploy/ClientSuite.scala | 46 ++++++++++++- 2 files changed, 81 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index a26a8997d8e7f..a6a8cf0481434 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -38,6 +38,7 @@ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils} */ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with ActorLogReceive with Logging { + import ClientActor._ private val masterActors = driverArgs.masters.map { m => context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system))) @@ -55,34 +56,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. - val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" - - val classPathConf = "spark.driver.extraClassPath" - val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => - cp.split(java.io.File.pathSeparator) - } - - val libraryPathConf = "spark.driver.extraLibraryPath" - val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => - cp.split(java.io.File.pathSeparator) - } - - val extraJavaOptsConf = "spark.driver.extraJavaOptions" - val extraJavaOpts = sys.props.get(extraJavaOptsConf) - .map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotClusterAuthSecretConf) - val javaOpts = sparkJavaOpts ++ extraJavaOpts - val command = new Command(mainClass, - Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, - sys.env, classPathEntries, libraryPathEntries, javaOpts) - - val driverDescription = new DriverDescription( - driverArgs.jarUrl, - driverArgs.memory, - driverArgs.cores, - driverArgs.supervise, - command, - if (conf.authOn) conf.getClusterAuthSecret else None) + val driverDescription = buildDriverDescription(driverArgs, conf) // This assumes only one Master is active at a time for (masterActor <- masterActors) { @@ -175,6 +149,42 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) } } +private object ClientActor { + + // Exposed for testing + private def buildDriverDescription(driverArgs: ClientArguments, + conf: SparkConf): DriverDescription = { + val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" + + val classPathConf = "spark.driver.extraClassPath" + val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => + cp.split(java.io.File.pathSeparator) + } + + val libraryPathConf = "spark.driver.extraLibraryPath" + val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => + cp.split(java.io.File.pathSeparator) + } + + val extraJavaOptsConf = "spark.driver.extraJavaOptions" + val extraJavaOpts = sys.props.get(extraJavaOptsConf) + .map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isNotClusterAuthSecretConf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts + val command = new Command(mainClass, + Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, + sys.env, classPathEntries, libraryPathEntries, javaOpts) + + new DriverDescription( + driverArgs.jarUrl, + driverArgs.memory, + driverArgs.cores, + driverArgs.supervise, + command, + if (conf.authOn) conf.getClusterAuthSecret else None) + } +} + /** * Executable utility for starting and terminating drivers inside of a standalone cluster. */ diff --git a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala index 6a99dbca64f4b..fd02d23bff5c2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/ClientSuite.scala @@ -17,11 +17,51 @@ package org.apache.spark.deploy -import org.scalatest.Matchers +import org.scalatest.{Matchers, PrivateMethodTester} -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite, SecurityManager} + +class ClientSuite extends SparkFunSuite with Matchers with PrivateMethodTester { + + val buildDriverDesc = PrivateMethod[DriverDescription]('buildDriverDescription) + val driverArgs = new ClientArguments(Array( + "launch", + "spark://someHost:8080", + "http://someHost:8000/foo.jar", + "org.SomeClass")) + + test("Auth secret shouldn't appear in the command") { + val conf = new SparkConf + // set secret + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, "This is the secret sauce") + + // auth is not set + var driverDesc = ClientActor invokePrivate buildDriverDesc(driverArgs, conf) + assert(driverDesc.appSecret === None) + assert(!driverDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_CONF))) + assert(!driverDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + + // auth is set to false + conf.set(SecurityManager.CLUSTER_AUTH_CONF, "false") + driverDesc = ClientActor invokePrivate buildDriverDesc(driverArgs, conf) + assert(driverDesc.appSecret === None) + assert(driverDesc.command.javaOpts.contains( + "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=false")) + assert(!driverDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + + // auth is set to true + conf.set(SecurityManager.CLUSTER_AUTH_CONF, "true") + driverDesc = ClientActor invokePrivate buildDriverDesc(driverArgs, conf) + assert(driverDesc.appSecret === Some("This is the secret sauce")) + assert(driverDesc.command.javaOpts.contains( + "-D" + SecurityManager.CLUSTER_AUTH_CONF + "=true")) + assert(!driverDesc.command.javaOpts.exists( + _.startsWith("-D" + SecurityManager.CLUSTER_AUTH_SECRET_CONF))) + } -class ClientSuite extends SparkFunSuite with Matchers { test("correctly validates driver jar URL's") { ClientArguments.isValidJarUrl("http://someHost:8080/foo.jar") should be (true) ClientArguments.isValidJarUrl("https://someHost:8080/foo.jar") should be (true) From bca18ac95e573b434efd96294870ae9d331af094 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Fri, 5 Jun 2015 15:31:51 -0700 Subject: [PATCH 8/9] style update --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 06a6ccc677c4a..c7ebcb084ca91 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -338,7 +338,7 @@ private[spark] object Utils extends Logging { } /** - * If authentication is enabled, retrieve app secret from stdin, set it in conf and + * If authentication is enabled, retrieve app secret from stdin, set it in conf and * export it to system properties. * * Note: This mutates state in the given SparkConf and in this JVM's system properties. From 8ba406419e54b35ae5fa5547d41e80c3e0f43991 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Sat, 6 Jun 2015 15:11:10 -0700 Subject: [PATCH 9/9] close i/o streams and disallow empty string to be secret key --- .../org/apache/spark/SecurityManager.scala | 4 +- .../scala/org/apache/spark/util/Utils.scala | 38 +++++++++++++------ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 3dbc6e81cbd00..aa2cd2caf9cbd 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -367,8 +367,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) } else { // user must have set spark.authenticate.secret config sparkConf.getClusterAuthSecret match { - case Some(value) => value - case None => throw new Exception("Error: a secret key must be specified via the " + + case Some(value) if !value.isEmpty => value // empty string is not allowed + case _ => throw new Exception("Error: a secret key must be specified via the " + CLUSTER_AUTH_SECRET_CONF + " config") } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c7ebcb084ca91..456a5201ba5b5 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -332,9 +332,16 @@ private[spark] object Utils extends Logging { secret: Option[String], conf: SparkConf): Unit = for (value <- secret if conf.authOn) { - val out = new BufferedOutputStream(process.getOutputStream) - out.write(value.getBytes) - out.close + val out = new BufferedWriter(new OutputStreamWriter(process.getOutputStream, "UTF-8")) + try { + out.write(value) + } catch { + case e: IOException => + throw new SparkException("Failed to write out authentication key to " + + "child process' stdin", e) + } finally { + out.close + } } /** @@ -345,16 +352,23 @@ private[spark] object Utils extends Logging { */ def setAndExportAppSecretIfNeeded(conf: SparkConf): Unit = { if (conf.authOn) { - val in = new BufferedReader(new InputStreamReader(System.in)) - Option(in.readLine) match { - case Some(value) => - conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, value) - sys.props.update(SecurityManager.CLUSTER_AUTH_SECRET_CONF, value) - - case None => throw new Exception("Error: authentication is enabled but " + - "failed to obtain authentication key from stdin") + val in = new BufferedReader(new InputStreamReader(System.in, "UTF-8")) + try { + Option(in.readLine) match { + case Some(value) => + conf.set(SecurityManager.CLUSTER_AUTH_SECRET_CONF, value) + sys.props.update(SecurityManager.CLUSTER_AUTH_SECRET_CONF, value) + + case None => + throw new SparkException("Authentication is enabled but reading " + + "authentication key from stdin returned null") + } + } catch { + case e: IOException => + throw new SparkException("Failed to obtain authentication key from stdin", e) + } finally { + in.close } - in.close } }