diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5e0eaa478547c..85a24acb97c07 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -384,6 +384,7 @@ class SparkContext(config: SparkConf) extends Logging { try { _conf = config.clone() _conf.validateSettings() + _conf.set("spark.app.startTime", startTime.toString) if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") @@ -492,11 +493,17 @@ class SparkContext(config: SparkConf) extends Logging { // Add each JAR given through the constructor if (jars != null) { - jars.foreach(addJar) + jars.foreach(jar => addJar(jar, true)) + if (addedJars.nonEmpty) { + _conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(",")) + } } if (files != null) { - files.foreach(addFile) + files.foreach(file => addFile(file, false, true)) + if (addedFiles.nonEmpty) { + _conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(",")) + } } _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key) @@ -1500,7 +1507,7 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addFile(path: String): Unit = { - addFile(path, false) + addFile(path, false, false) } /** @@ -1522,6 +1529,10 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addFile(path: String, recursive: Boolean): Unit = { + addFile(path, recursive, false) + } + + private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = { val uri = new Path(path).toUri val schemeCorrectedURI = uri.getScheme match { case null => new File(path).getCanonicalFile.toURI @@ -1559,7 +1570,7 @@ class SparkContext(config: SparkConf) extends Logging { path } } - val timestamp = System.currentTimeMillis + val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added file $path at $key with timestamp $timestamp") // Fetch the file locally so that closures which are run on the driver can still use the @@ -1569,7 +1580,7 @@ class SparkContext(config: SparkConf) extends Logging { postEnvironmentUpdate() } else { logWarning(s"The path $path has been added already. Overwriting of added paths " + - "is not supported in the current version.") + "is not supported in the current version.") } } @@ -1840,6 +1851,10 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addJar(path: String): Unit = { + addJar(path, false) + } + + private def addJar(path: String, addedOnSubmit: Boolean): Unit = { def addLocalJarFile(file: File): String = { try { if (!file.exists()) { @@ -1904,7 +1919,7 @@ class SparkContext(config: SparkConf) extends Logging { } } if (key != null) { - val timestamp = System.currentTimeMillis + val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d22002917472a..9a191e1e84fad 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -220,6 +220,20 @@ private[spark] class Executor( heartbeater.start() + private val appStartTime = conf.getLong("spark.app.startTime", 0) + + // To allow users to distribute plugins and their required files + // specified by --jars and --files on application submission, those jars/files should be + // downloaded and added to the class loader via updateDependencies. + // This should be done before plugin initialization below + // because executors search plugins from the class loader and initialize them. + private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key => + conf.getOption(s"spark.app.initial.$key.urls").map { urls => + Map(urls.split(",").map(url => (url, appStartTime)): _*) + }.getOrElse(Map.empty) + } + updateDependencies(initialUserFiles, initialUserJars) + // Plugins need to load using a class loader that includes the executor's user classpath. // Plugins also needs to be initialized after the heartbeater started // to avoid blocking to send heartbeat (see SPARK-32175). diff --git a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java index 0f489fb219010..b188ee16b97d0 100644 --- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java @@ -28,6 +28,7 @@ import org.apache.spark.api.java.*; import org.apache.spark.*; +import org.apache.spark.util.Utils; /** * Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext. @@ -35,14 +36,16 @@ public class JavaSparkContextSuite implements Serializable { @Test - public void javaSparkContext() { + public void javaSparkContext() throws IOException { + File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); + String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString(); String[] jars = new String[] {}; java.util.Map environment = new java.util.HashMap<>(); new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop(); new JavaSparkContext("local", "name", new SparkConf()).stop(); new JavaSparkContext("local", "name").stop(); - new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop(); + new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop(); new JavaSparkContext("local", "name", "sparkHome", jars).stop(); new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop(); } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 35311d372e478..b5b3751439750 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -570,7 +570,8 @@ class SparkSubmitSuite } } - val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar") + val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty) + val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString) val appArgs2 = new SparkSubmitArguments(clArgs2) val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2) assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS)) @@ -1220,6 +1221,86 @@ class SparkSubmitSuite testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("*")) } + test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") { + val tempDir = Utils.createTempDir() + val tempFileName = "test.txt" + val tempFile = new File(tempDir, tempFileName) + + // scalastyle:off println + Utils.tryWithResource { + new PrintWriter(tempFile) + } { writer => + writer.println("SparkPluginTest") + } + // scalastyle:on println + + val sparkPluginCodeBody = + """ + |@Override + |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() { + | return new TestExecutorPlugin(); + |} + | + |@Override + |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; } + """.stripMargin + val executorPluginCodeBody = + s""" + |@Override + |public void init( + | org.apache.spark.api.plugin.PluginContext ctx, + | java.util.Map extraConf) { + | String str = null; + | try (java.io.BufferedReader reader = + | new java.io.BufferedReader(new java.io.InputStreamReader( + | new java.io.FileInputStream("$tempFileName")))) { + | str = reader.readLine(); + | } catch (java.io.IOException e) { + | throw new RuntimeException(e); + | } finally { + | assert str == "SparkPluginTest"; + | } + |} + """.stripMargin + + val compiledExecutorPlugin = TestUtils.createCompiledClass( + "TestExecutorPlugin", + tempDir, + "", + null, + Seq.empty, + Seq("org.apache.spark.api.plugin.ExecutorPlugin"), + executorPluginCodeBody) + + val thisClassPath = + sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL) + val compiledSparkPlugin = TestUtils.createCompiledClass( + "TestSparkPlugin", + tempDir, + "", + null, + Seq(tempDir.toURI.toURL) ++ thisClassPath, + Seq("org.apache.spark.api.plugin.SparkPlugin"), + sparkPluginCodeBody) + + val jarUrl = TestUtils.createJar( + Seq(compiledSparkPlugin, compiledExecutorPlugin), + new File(tempDir, "testplugin.jar")) + + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null) + val args = Seq( + "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[1,1,1024]", + "--conf", "spark.plugins=TestSparkPlugin", + "--conf", "spark.ui.enabled=false", + "--jars", jarUrl.toString + "," + unusedJar.toString, + "--files", tempFile.toString + "," + unusedFile.toString, + unusedJar.toString) + runSparkSubmit(args) + } + private def testRemoteResources( enableHttpFs: Boolean, forceDownloadSchemes: Seq[String] = Nil): Unit = {