From e33a404e586fb992ad9de7ef751358385ba2a4a9 Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Tue, 4 Aug 2015 07:33:18 -0700 Subject: [PATCH 1/4] Adding new property spark.common.extraClassPath for adding to both the driver and executor classpath. Currently one can set extra jars in the driver classpath using spark.driver.extraClassPath property. Similarly, the executor's extra classpath can be set using spark.executor.extraClassPath property. However, many users (myself) included have often suffered from the dogma of setting one and missing the other. Causing a lot of wasted time. And, this JIRA is an effort to see if the status quo can be made better. --jars sets the classpath set by the classloader but there is no such way of setting the system classpath of the driver and the executor. This change would allow users to override libraries shipped with Spark, or add jars like some JDBC driver, that don't work unless they are adding to the system classpath. --- bin/spark-shell | 3 ++- bin/spark-shell2.cmd | 3 ++- .../scala/org/apache/spark/SparkConf.scala | 22 +++++++++++++++ .../org/apache/spark/deploy/Client.scala | 7 ++++- .../org/apache/spark/deploy/SparkSubmit.scala | 2 ++ .../spark/deploy/SparkSubmitArguments.scala | 5 ++++ .../deploy/rest/StandaloneRestServer.scala | 5 +++- .../rest/SubmitRestProtocolRequest.scala | 2 ++ .../deploy/rest/mesos/MesosRestServer.scala | 5 +++- .../cluster/SparkDeploySchedulerBackend.scala | 6 +++-- .../mesos/CoarseMesosSchedulerBackend.scala | 3 ++- .../cluster/mesos/MesosSchedulerBackend.scala | 5 +++- .../spark/scheduler/local/LocalBackend.scala | 2 +- .../spark/deploy/SparkSubmitSuite.scala | 27 +++++++++++++++++++ .../deploy/rest/SubmitRestProtocolSuite.scala | 3 +++ docs/configuration.md | 9 +++++++ .../apache/spark/launcher/SparkLauncher.java | 3 +++ .../launcher/SparkSubmitCommandBuilder.java | 16 +++++++++-- .../SparkSubmitCommandBuilderSuite.java | 7 ++++- .../org/apache/spark/deploy/yarn/Client.scala | 3 ++- .../spark/deploy/yarn/ExecutorRunnable.scala | 3 ++- .../spark/deploy/yarn/YarnClusterSuite.scala | 3 +-- 22 files changed, 127 insertions(+), 17 deletions(-) diff --git a/bin/spark-shell b/bin/spark-shell index 00ab7afd118b5..d9a143d91386c 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -35,7 +35,8 @@ export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]" # so we need to add the "-Dscala.usejavacp=true" flag manually. We # do this specifically for the Spark shell because the scala REPL # has its own class loader, and any additional classpath specified -# through spark.driver.extraClassPath is not automatically propagated. +# through spark.driver.extraClassPath and/or spark.common.extraClassPath are not +# automatically propagated to spark shell. SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true" function main() { diff --git a/bin/spark-shell2.cmd b/bin/spark-shell2.cmd index b9b0f510d7f5d..67b5e68c75e21 100644 --- a/bin/spark-shell2.cmd +++ b/bin/spark-shell2.cmd @@ -24,7 +24,8 @@ rem SPARK-4161: scala does not assume use of the java classpath, rem so we need to add the "-Dscala.usejavacp=true" flag manually. We rem do this specifically for the Spark shell because the scala REPL rem has its own class loader, and any additional classpath specified -rem through spark.driver.extraClassPath is not automatically propagated. +rem through spark.driver.extraClassPath and/or spark.common.extraClassPath +rem automatically propagated to spark shell. if "x%SPARK_SUBMIT_OPTS%"=="x" ( set SPARK_SUBMIT_OPTS=-Dscala.usejavacp=true goto run_shell diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 08bab4bf2739f..ed3b9601066bc 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.io.File import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -457,6 +458,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { |Please instead use: | - ./spark-submit with --driver-class-path to augment the driver classpath | - spark.executor.extraClassPath to augment the executor classpath + | - spark.common.extraClassPath to augument the driver and executor classpath. + | When both spark.common.extraClassPath and spark.[executor|driver].extraClassPath + | are used together, spark.common.extraClassPath jars are put before the + | the spark.[executor|driver].extraClassPath jars in the user classpath. """.stripMargin logWarning(warning) @@ -479,6 +484,23 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } + def getDriverExtraClassPath() : Option[String] = { + // Concatenate common and driver classpath in that order + val commonExtraClassPath = getOption("spark.common.extraClassPath") + val driverExtraClassPath = getOption("spark.driver.extraClassPath") + Option((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)).filter(_ + .nonEmpty) + } + + def getExecutorExtraClassPath() : Option[String] = { + // Concatenate common and executor classpath in that order + val commonExtraClassPath = getOption("spark.common.extraClassPath") + val executorExtraClassPath = getOption("spark.executor.extraClassPath") + Option((commonExtraClassPath ++ executorExtraClassPath).mkString(File.pathSeparator)).filter(_ + .nonEmpty) + } + + } private[spark] object SparkConf extends Logging { diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index f03875a3e8c89..9d8436596c0c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -68,7 +68,12 @@ private class ClientEndpoint( val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" - val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => + val commonClassPathConf = "spark.common.extraClassPath" + + val driverClassPath = Option((sys.props.get(commonClassPathConf) ++ + sys.props.get(classPathConf)).mkString(java.io.File.pathSeparator)).filter(_.nonEmpty) + + val classPathEntries = driverClassPath.toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 31185c8e77def..adde01cc04486 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -406,6 +406,8 @@ object SparkSubmit { OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.driver.memory"), + OptionAssigner(args.commonExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, + sysProp = "spark.common.extraClassPath"), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraClassPath"), OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 44852ce4e84ac..0e0410b3e43ba 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -44,6 +44,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var totalExecutorCores: String = null var propertiesFile: String = null var driverMemory: String = null + var commonExtraClassPath: String = null var driverExtraClassPath: String = null var driverExtraLibraryPath: String = null var driverExtraJavaOptions: String = null @@ -143,6 +144,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .orElse(sparkProperties.get("spark.master")) .orElse(env.get("MASTER")) .orNull + commonExtraClassPath = Option(commonExtraClassPath) + .orElse(sparkProperties.get("spark.common.extraClassPath")) + .orNull driverExtraClassPath = Option(driverExtraClassPath) .orElse(sparkProperties.get("spark.driver.extraClassPath")) .orNull @@ -288,6 +292,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | propertiesFile $propertiesFile | driverMemory $driverMemory | driverCores $driverCores + | commonExtraClassPath $commonExtraClassPath | driverExtraClassPath $driverExtraClassPath | driverExtraLibraryPath $driverExtraLibraryPath | driverExtraJavaOptions $driverExtraJavaOptions diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index d5b9bcab1423f..7985ba90d75f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -135,7 +135,10 @@ private[rest] class StandaloneSubmitRequestServlet( val driverMemory = sparkProperties.get("spark.driver.memory") val driverCores = sparkProperties.get("spark.driver.cores") val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") - val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") + val commonExtraClassPath = sparkProperties.get("spark.common.extraClassPath") + val driverSpecificExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") + val driverExtraClassPath = Option(( commonExtraClassPath ++ driverSpecificExtraClassPath) + .mkString(File.pathSeparator)).filter(_.nonEmpty) val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise") val appArgs = request.appArgs diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala index 0d50a768942ed..bed13e0859673 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.rest +import java.io.File + import scala.util.Try import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 868cc35d06ef3..9e9921ed2033e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -88,7 +88,10 @@ private[mesos] class MesosSubmitRequestServlet( // Optional fields val sparkProperties = request.sparkProperties val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") - val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") + val commonExtraClassPath = sparkProperties.get("spark.common.extraClassPath") + val driverSpecificExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") + val driverExtraClassPath = Option(( commonExtraClassPath ++ driverSpecificExtraClassPath) + .mkString(File.pathSeparator)).filter(_.nonEmpty) val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise") val driverMemory = sparkProperties.get("spark.driver.memory") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index bbe51b4a09a22..001450a1cc8f7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -61,8 +61,10 @@ private[spark] class SparkDeploySchedulerBackend( "--worker-url", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) - val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath") - .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) + + val classPathEntries = sc.conf.getExecutorExtraClassPath().map(_.split(java.io.File + .pathSeparator).toSeq).getOrElse(Nil) + val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 15a0915708c7c..3adaf9d3fbe44 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -138,7 +138,8 @@ private[spark] class CoarseMesosSchedulerBackend( throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") } val environment = Environment.newBuilder() - val extraClassPath = conf.getOption("spark.executor.extraClassPath") + val extraClassPath = conf.getExecutorExtraClassPath() + extraClassPath.foreach { cp => environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 3f63ec1c5832f..c146c6044e3b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -88,7 +88,10 @@ private[spark] class MesosSchedulerBackend( throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") } val environment = Environment.newBuilder() - sc.conf.getOption("spark.executor.extraClassPath").foreach { cp => + + val userClassPath = sc.conf.getExecutorExtraClassPath() + + userClassPath.foreach { cp => environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 4d48fcfea44e7..bbaf3df2b9f87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -110,7 +110,7 @@ private[spark] class LocalBackend( * @param conf Spark configuration. */ def getUserClasspath(conf: SparkConf): Seq[URL] = { - val userClassPathStr = conf.getOption("spark.executor.extraClassPath") + val userClassPathStr = conf.getExecutorExtraClassPath() userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new File(_).toURI.toURL) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index aa78bfe30974c..5982798287de7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -497,6 +497,21 @@ class SparkSubmitSuite } // scalastyle:on println + test("spark.common.extraClassPath comes before spark.driver.extraClassPath in driver classpath") { + val commonExtraJar = TestUtils.createJarWithFiles(Map("test.resource" -> "COMMON")) + val driverExtraJar = TestUtils.createJarWithFiles(Map("test.resource" -> "DRIVER")) + val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER")) + val args = Seq( + "--class", CommonClasspathAddedTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local", + "--verbose", + "--conf", "spark.common.extraClassPath=" + commonExtraJar, + "--conf", "spark.driver.extraClassPath=" + driverExtraJar, + userJar.toString) + runSparkSubmit(args) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. private def runSparkSubmit(args: Seq[String]): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) @@ -594,3 +609,15 @@ object UserClasspathFirstTest { } } } + +object CommonClasspathAddedTest { + def main(args: Array[String]) { + val ccl = Thread.currentThread().getContextClassLoader() + val resource = ccl.getResourceAsStream("test.resource") + val bytes = ByteStreams.toByteArray(resource) + val contents = new String(bytes, 0, bytes.length, UTF_8) + if (contents != "COMMON") { + throw new SparkException("Should have read common resource, but instead read: " + contents) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala index 725b8848bc052..21e9db4941116 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala @@ -97,6 +97,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite { conf.set("spark.driver.cores", "180") conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red") conf.set("spark.driver.extraClassPath", "food-coloring.jar") + conf.set("spark.common.extraClassPath", "saffron.jar") conf.set("spark.driver.extraLibraryPath", "pickle.jar") conf.set("spark.driver.supervise", "false") conf.set("spark.executor.memory", "256m") @@ -131,6 +132,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite { assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") === " -Dslices=5 -Dcolor=mostly_red") assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar") + assert(newMessage.sparkProperties("spark.common.extraClassPath") === "saffron.jar") assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar") assert(newMessage.sparkProperties("spark.driver.supervise") === "false") assert(newMessage.sparkProperties("spark.executor.memory") === "256m") @@ -250,6 +252,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite { | "spark.files" : "fireball.png", | "spark.driver.cores" : "180", | "spark.driver.extraJavaOptions" : " -Dslices=5 -Dcolor=mostly_red", + | "spark.common.extraClassPath" : "saffron.jar", | "spark.executor.memory" : "256m", | "spark.driver.extraClassPath" : "food-coloring.jar" | } diff --git a/docs/configuration.md b/docs/configuration.md index 24b606356a149..b536241c2bb86 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -246,6 +246,15 @@ Apart from these, the following properties are also available, and may be useful This is used in cluster mode only. + + spark.common.extraClassPath + (none) + + Extra classpath entries to append to the classpath of executors and the drivers. When both + spark.common.extraClassPath and spark.[executor|driver].extraClassPath are set, the + spark.common.extraClassPath preceeds the spark.[executor|driver].extraClassPath + + spark.executor.extraClassPath (none) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index c0f89c9230692..9d17d26118261 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -57,6 +57,9 @@ public class SparkLauncher { /** Configuration key for the number of executor CPU cores. */ public static final String EXECUTOR_CORES = "spark.executor.cores"; + /** Configuration key for both the driver and executor classpath. */ + public static final String COMMON_EXTRA_CLASSPATH = "spark.common.extraClassPath"; + private final SparkSubmitCommandBuilder builder; public SparkLauncher() { diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 87c43aa9980e1..cba34748265c4 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -190,8 +190,20 @@ private List buildSparkSubmitCommand(Map env) throws IOE // modified to cover the driver's configuration. Properties props = loadPropertiesFile(); boolean isClientMode = isClientMode(props); - String extraClassPath = isClientMode ? - firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null; + String extraClassPath = null; + if (isClientMode) { + String commonExtraClassPath = firstNonEmptyValue(SparkLauncher.COMMON_EXTRA_CLASSPATH, conf, props); + String driverExtraClassPath = firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props); + + // If common extra classpath was found, add it to the combined extra classpath. + if (commonExtraClassPath != null) { + extraClassPath = commonExtraClassPath; + } + // If driver specific extra classpath was found, append it to the combined extra classpath. + if (driverExtraClassPath != null) { + extraClassPath = (extraClassPath != null) ? extraClassPath + File.pathSeparator + driverExtraClassPath : driverExtraClassPath; + } + } List cmd = buildJavaCommand(extraClassPath); // Take Thrift Server as daemon diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index 7329ac9f7fb8c..245774e704c46 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -166,6 +166,7 @@ private void testCmdBuilder(boolean isDriver) throws Exception { launcher.appArgs.add("bar"); launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver"); + launcher.conf.put(SparkLauncher.COMMON_EXTRA_CLASSPATH, "/common"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native"); launcher.conf.put("spark.foo", "foo"); @@ -201,9 +202,13 @@ private void testCmdBuilder(boolean isDriver) throws Exception { String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator)); if (isDriver) { - assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp)); + String expectedCustomCp = "/driver"; + assertTrue("Driver classpath should contain " + expectedCustomCp, contains(expectedCustomCp, cp)); + expectedCustomCp = "/common"; + assertTrue("Driver classpath should contain " + expectedCustomCp, contains(expectedCustomCp, cp)); } else { assertFalse("Driver classpath should not be in command.", contains("/driver", cp)); + assertFalse("Driver classpath should not be in command.", contains("/common", cp)); } String libPath = env.get(CommandBuilderUtils.getLibPathEnvName()); diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index fc11bbf97e2ec..f2f04cff9cdbd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -511,7 +511,8 @@ private[spark] class Client( pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.driver.extraClassPath") + val extraCp = sparkConf.getDriverExtraClassPath() + populateClasspath(args, yarnConf, sparkConf, env, true, extraCp) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 52580deb372c2..a8f662f52a429 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -283,7 +283,8 @@ class ExecutorRunnable( private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.executor.extraClassPath") + val extraCp = sparkConf.getExecutorExtraClassPath() + Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp) sparkConf.getExecutorEnv.foreach { case (key, value) => diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index eb6e1fd370620..4ae2f8e40a7af 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -242,8 +242,7 @@ class YarnClusterSuite extends SparkFunSuite with BeforeAndAfterAll with Matcher sys.props("java.class.path") + File.pathSeparator + extraClassPath.mkString(File.pathSeparator) - props.setProperty("spark.driver.extraClassPath", childClasspath) - props.setProperty("spark.executor.extraClassPath", childClasspath) + props.setProperty("spark.common.extraClassPath", childClasspath) // SPARK-4267: make sure java options are propagated correctly. props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"") From 9b3fec6fb05aedd034e3486d13149bb88810869a Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Wed, 19 Aug 2015 10:50:10 -0700 Subject: [PATCH 2/4] Incorporating feedback from the review --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 ++-- core/src/main/scala/org/apache/spark/deploy/Client.scala | 4 ++-- .../org/apache/spark/deploy/rest/StandaloneRestServer.scala | 2 +- .../apache/spark/deploy/rest/SubmitRestProtocolRequest.scala | 2 -- .../org/apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 4 ++-- .../spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 4 +--- 7 files changed, 9 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ed3b9601066bc..a16d8af552f3a 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -488,7 +488,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { // Concatenate common and driver classpath in that order val commonExtraClassPath = getOption("spark.common.extraClassPath") val driverExtraClassPath = getOption("spark.driver.extraClassPath") - Option((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)).filter(_ + Some((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)).filter(_ .nonEmpty) } @@ -496,7 +496,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { // Concatenate common and executor classpath in that order val commonExtraClassPath = getOption("spark.common.extraClassPath") val executorExtraClassPath = getOption("spark.executor.extraClassPath") - Option((commonExtraClassPath ++ executorExtraClassPath).mkString(File.pathSeparator)).filter(_ + Some((commonExtraClassPath ++ executorExtraClassPath).mkString(File.pathSeparator)).filter(_ .nonEmpty) } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 9d8436596c0c1..708f7f585187d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -67,11 +67,11 @@ private class ClientEndpoint( // people call `addJar` assuming the jar is in the same directory. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" - val classPathConf = "spark.driver.extraClassPath" + val driverClassPathConf = "spark.driver.extraClassPath" val commonClassPathConf = "spark.common.extraClassPath" val driverClassPath = Option((sys.props.get(commonClassPathConf) ++ - sys.props.get(classPathConf)).mkString(java.io.File.pathSeparator)).filter(_.nonEmpty) + sys.props.get(driverClassPathConf)).mkString(java.io.File.pathSeparator)).filter(_.nonEmpty) val classPathEntries = driverClassPath.toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 7985ba90d75f3..0b38f496a97c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -137,7 +137,7 @@ private[rest] class StandaloneSubmitRequestServlet( val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") val commonExtraClassPath = sparkProperties.get("spark.common.extraClassPath") val driverSpecificExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraClassPath = Option(( commonExtraClassPath ++ driverSpecificExtraClassPath) + val driverExtraClassPath = Option((commonExtraClassPath ++ driverSpecificExtraClassPath) .mkString(File.pathSeparator)).filter(_.nonEmpty) val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise") diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala index bed13e0859673..0d50a768942ed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy.rest -import java.io.File - import scala.util.Try import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 9e9921ed2033e..e408671e2709f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -90,7 +90,7 @@ private[mesos] class MesosSubmitRequestServlet( val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") val commonExtraClassPath = sparkProperties.get("spark.common.extraClassPath") val driverSpecificExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraClassPath = Option(( commonExtraClassPath ++ driverSpecificExtraClassPath) + val driverExtraClassPath = Option((commonExtraClassPath ++ driverSpecificExtraClassPath) .mkString(File.pathSeparator)).filter(_.nonEmpty) val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 001450a1cc8f7..61fd09814ba1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -62,8 +62,8 @@ private[spark] class SparkDeploySchedulerBackend( val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) - val classPathEntries = sc.conf.getExecutorExtraClassPath().map(_.split(java.io.File - .pathSeparator).toSeq).getOrElse(Nil) + val classPathEntries = sc.conf.getExecutorExtraClassPath() + .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath") .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c146c6044e3b8..838c6624f9ca9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -89,9 +89,7 @@ private[spark] class MesosSchedulerBackend( } val environment = Environment.newBuilder() - val userClassPath = sc.conf.getExecutorExtraClassPath() - - userClassPath.foreach { cp => + sc.conf.getExecutorExtraClassPath().foreach { cp => environment.addVariables( Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) } From 87407517c333034041c4312a76b0641dafe4fd2e Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Wed, 19 Aug 2015 12:08:03 -0700 Subject: [PATCH 3/4] Minor formatting changing. Changing it back to Option instead of Some() --- core/src/main/scala/org/apache/spark/SparkConf.scala | 8 ++++---- core/src/main/scala/org/apache/spark/deploy/Client.scala | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index a16d8af552f3a..d3e603c45b9d4 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -488,16 +488,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { // Concatenate common and driver classpath in that order val commonExtraClassPath = getOption("spark.common.extraClassPath") val driverExtraClassPath = getOption("spark.driver.extraClassPath") - Some((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)).filter(_ - .nonEmpty) + Option((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)) + .filter(_.nonEmpty) } def getExecutorExtraClassPath() : Option[String] = { // Concatenate common and executor classpath in that order val commonExtraClassPath = getOption("spark.common.extraClassPath") val executorExtraClassPath = getOption("spark.executor.extraClassPath") - Some((commonExtraClassPath ++ executorExtraClassPath).mkString(File.pathSeparator)).filter(_ - .nonEmpty) + Option((commonExtraClassPath ++ executorExtraClassPath).mkString(File.pathSeparator)) + .filter(_.nonEmpty) } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 708f7f585187d..74674bb9b5977 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -70,8 +70,10 @@ private class ClientEndpoint( val driverClassPathConf = "spark.driver.extraClassPath" val commonClassPathConf = "spark.common.extraClassPath" - val driverClassPath = Option((sys.props.get(commonClassPathConf) ++ - sys.props.get(driverClassPathConf)).mkString(java.io.File.pathSeparator)).filter(_.nonEmpty) + val driverClassPath = Option( + (sys.props.get(commonClassPathConf) ++ sys.props.get(driverClassPathConf)) + .mkString(java.io.File.pathSeparator) + ).filter(_.nonEmpty) val classPathEntries = driverClassPath.toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) From 5f62d39aadafc1780570c368fe6ceeaa39186758 Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Wed, 19 Aug 2015 13:12:55 -0700 Subject: [PATCH 4/4] Converting Option cast to Some for clarity --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 ++-- core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- .../org/apache/spark/deploy/rest/StandaloneRestServer.scala | 2 +- .../org/apache/spark/deploy/rest/mesos/MesosRestServer.scala | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index d3e603c45b9d4..6d5d24f3fcc6b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -488,7 +488,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { // Concatenate common and driver classpath in that order val commonExtraClassPath = getOption("spark.common.extraClassPath") val driverExtraClassPath = getOption("spark.driver.extraClassPath") - Option((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)) + Some((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)) .filter(_.nonEmpty) } @@ -496,7 +496,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { // Concatenate common and executor classpath in that order val commonExtraClassPath = getOption("spark.common.extraClassPath") val executorExtraClassPath = getOption("spark.executor.extraClassPath") - Option((commonExtraClassPath ++ executorExtraClassPath).mkString(File.pathSeparator)) + Some((commonExtraClassPath ++ executorExtraClassPath).mkString(File.pathSeparator)) .filter(_.nonEmpty) } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 74674bb9b5977..1110b6d751b19 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -70,7 +70,7 @@ private class ClientEndpoint( val driverClassPathConf = "spark.driver.extraClassPath" val commonClassPathConf = "spark.common.extraClassPath" - val driverClassPath = Option( + val driverClassPath = Some( (sys.props.get(commonClassPathConf) ++ sys.props.get(driverClassPathConf)) .mkString(java.io.File.pathSeparator) ).filter(_.nonEmpty) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 0b38f496a97c1..4e6cf830617dc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -137,7 +137,7 @@ private[rest] class StandaloneSubmitRequestServlet( val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") val commonExtraClassPath = sparkProperties.get("spark.common.extraClassPath") val driverSpecificExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraClassPath = Option((commonExtraClassPath ++ driverSpecificExtraClassPath) + val driverExtraClassPath = Some((commonExtraClassPath ++ driverSpecificExtraClassPath) .mkString(File.pathSeparator)).filter(_.nonEmpty) val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise") diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index e408671e2709f..87691b9092c4f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -90,7 +90,7 @@ private[mesos] class MesosSubmitRequestServlet( val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") val commonExtraClassPath = sparkProperties.get("spark.common.extraClassPath") val driverSpecificExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraClassPath = Option((commonExtraClassPath ++ driverSpecificExtraClassPath) + val driverExtraClassPath = Some((commonExtraClassPath ++ driverSpecificExtraClassPath) .mkString(File.pathSeparator)).filter(_.nonEmpty) val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") val superviseDriver = sparkProperties.get("spark.driver.supervise")