From 69a9be1e668a53423facf98f3d9fb4dc63652445 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 29 Jun 2020 03:40:12 +0900 Subject: [PATCH 1/8] Fixed for ExecutorPlugin working with Standalone Cluster. --- .../scala/org/apache/spark/SparkContext.scala | 21 +++-- .../scala/org/apache/spark/TestUtils.scala | 13 ++- .../org/apache/spark/executor/Executor.scala | 10 +++ .../scala/org/apache/spark/util/Utils.scala | 9 ++ .../spark/deploy/SparkSubmitSuite.scala | 85 +++++++++++++++++++ 5 files changed, 129 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 38d7319b1f0ef..06ab56e48d8bc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -379,6 +379,7 @@ class SparkContext(config: SparkConf) extends Logging { try { _conf = config.clone() _conf.validateSettings() + _conf.set("spark.app.startTime", startTime.toString) if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") @@ -487,11 +488,11 @@ class SparkContext(config: SparkConf) extends Logging { // Add each JAR given through the constructor if (jars != null) { - jars.foreach(addJar) + jars.foreach(jar => addJar(jar, true)) } if (files != null) { - files.foreach(addFile) + files.foreach(file => addFile(file, false, true)) } _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key) @@ -1495,7 +1496,7 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addFile(path: String): Unit = { - addFile(path, false) + addFile(path, false, false) } /** @@ -1517,6 +1518,10 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addFile(path: String, recursive: Boolean): Unit = { + addFile(path, recursive, false) + } + + private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = { val uri = new Path(path).toUri val schemeCorrectedURI = uri.getScheme match { case null => new File(path).getCanonicalFile.toURI @@ -1554,7 +1559,7 @@ class SparkContext(config: SparkConf) extends Logging { path } } - val timestamp = System.currentTimeMillis + val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added file $path at $key with timestamp $timestamp") // Fetch the file locally so that closures which are run on the driver can still use the @@ -1564,7 +1569,7 @@ class SparkContext(config: SparkConf) extends Logging { postEnvironmentUpdate() } else { logWarning(s"The path $path has been added already. Overwriting of added paths " + - "is not supported in the current version.") + "is not supported in the current version.") } } @@ -1835,6 +1840,10 @@ class SparkContext(config: SparkConf) extends Logging { * @note A path can be added only once. Subsequent additions of the same path are ignored. */ def addJar(path: String): Unit = { + addJar(path, false) + } + + private def addJar(path: String, addedOnSubmit: Boolean): Unit = { def addLocalJarFile(file: File): String = { try { if (!file.exists()) { @@ -1899,7 +1908,7 @@ class SparkContext(config: SparkConf) extends Logging { } } if (key != null) { - val timestamp = System.currentTimeMillis + val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index d459627930f4c..1b652eed0215d 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -179,11 +179,18 @@ private[spark] object TestUtils { destDir: File, toStringValue: String = "", baseClass: String = null, - classpathUrls: Seq[URL] = Seq.empty): File = { + classpathUrls: Seq[URL] = Seq.empty, + preClassDefinitionBlock: String = "", + implementsClasses: Seq[String] = Seq.empty, + extraCodeBody: String = ""): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") + val implementsText = implementsClasses.map(", " + _).mkString val sourceFile = new JavaSourceFromString(className, - "public class " + className + extendsText + " implements java.io.Serializable {" + - " @Override public String toString() { return \"" + toStringValue + "\"; }}") + preClassDefinitionBlock + + "public class " + className + extendsText + " implements java.io.Serializable" + + implementsText + " {" + + " @Override public String toString() { return \"" + toStringValue + "\"; }" + + extraCodeBody + " }") createCompiledClass(className, destDir, sourceFile, classpathUrls) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c8b1afeebac0d..73e7e7fd4d77f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -154,6 +154,16 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val appStartTime = conf.getLong("spark.app.startTime", 0) + + // Jars and files specified by spark.jars and spark.files. + private val initialUserJars = + Map(Utils.getUserJars(conf).map(jar => (jar, appStartTime)): _*) + private val initialUserFiles = + Map(Utils.getUserFiles(conf).map(file => (file, appStartTime)): _*) + + updateDependencies(initialUserFiles, initialUserJars) + // Plugins need to load using a class loader that includes the executor's user classpath private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) { PluginContainer(env, resources.asJava) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9636fe88c77c2..39a7b1c28bd32 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2572,6 +2572,15 @@ private[spark] object Utils extends Logging { conf.get(JARS).filter(_.nonEmpty) } + /** + * Return the files pointed by the "spark.files" property. Spark internally will distribute + * these files through file server. In the YARN mode, it will return an empty list, since YARN + * has its own mechanism to distribute files. + */ + def getUserFiles(conf: SparkConf): Seq[String] = { + conf.get(FILES).filter(_.nonEmpty) + } + /** * Return the local jar files which will be added to REPL's classpath. These jar files are * specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index fd2d1f56ed9b6..f00de81958fdb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1218,6 +1218,91 @@ class SparkSubmitSuite testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*")) } + + test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") { + val tempDir = Utils.createTempDir() + val tempFileName = "test.txt" + val tempFile = new File(tempDir, tempFileName) + + // scalastyle:off println + Utils.tryWithResource { + new PrintWriter(tempFile) + } { writer => + writer.println("SparkPluginTest") + } + // scalastyle:on println + + val importStatements = + """ + |import java.io.*; + |import java.util.Map; + |import org.apache.spark.api.plugin.*; + """.stripMargin + val sparkPluginCodeBody = + """ + |@Override + |public ExecutorPlugin executorPlugin() { + | return new TestExecutorPlugin(); + |} + | + |@Override + |public DriverPlugin driverPlugin() { return null; } + """.stripMargin + val executorPluginCodeBody = + s""" + |@Override + |public void init(PluginContext ctx, Map extraConf) { + | String str = null; + | try (BufferedReader reader = + | new BufferedReader(new InputStreamReader(new FileInputStream($tempFileName)))) { + | str = reader.readLine(); + | } catch (IOException e) { + | throw new RuntimeException(e); + | } finally { + | assert str == "SparkPluginTest"; + | } + |} + """.stripMargin + + val compiledExecutorPlugin = TestUtils.createCompiledClass( + "TestExecutorPlugin", + tempDir, + "", + null, + Seq.empty, + importStatements, + Seq("ExecutorPlugin"), + executorPluginCodeBody) + + val thisClassPath = + sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL) + val compiledSparkPlugin = TestUtils.createCompiledClass( + "TestSparkPlugin", + tempDir, + "", + null, + Seq(tempDir.toURI.toURL) ++ thisClassPath, + importStatements, + Seq("SparkPlugin"), + sparkPluginCodeBody) + + val jarUrl = TestUtils.createJar( + Seq(compiledSparkPlugin, compiledExecutorPlugin), + new java.io.File(tempDir, "testplugin.jar")) + + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", "local-cluster[1,1,1024]", + "--conf", "spark.plugins=TestSparkPlugin", + "--conf", "spark.ui.enabled=false", + "--jars", jarUrl.toString, + "--files", tempFile.toString, + unusedJar.toString) + runSparkSubmit(args, timeout = 10.seconds) + } + private def testRemoteResources( enableHttpFs: Boolean, blacklistSchemes: Seq[String] = Nil): Unit = { From b571f4cd7c945e0c14b6d7aed7b179946eb8f4c2 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 29 Jun 2020 12:13:42 +0900 Subject: [PATCH 2/8] Modified testcases. --- .../java/test/org/apache/spark/JavaSparkContextSuite.java | 7 +++++-- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 5 +++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java index 0f489fb219010..b188ee16b97d0 100644 --- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java @@ -28,6 +28,7 @@ import org.apache.spark.api.java.*; import org.apache.spark.*; +import org.apache.spark.util.Utils; /** * Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext. @@ -35,14 +36,16 @@ public class JavaSparkContextSuite implements Serializable { @Test - public void javaSparkContext() { + public void javaSparkContext() throws IOException { + File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark"); + String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString(); String[] jars = new String[] {}; java.util.Map environment = new java.util.HashMap<>(); new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop(); new JavaSparkContext("local", "name", new SparkConf()).stop(); new JavaSparkContext("local", "name").stop(); - new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop(); + new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop(); new JavaSparkContext("local", "name", "sparkHome", jars).stop(); new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop(); } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index f00de81958fdb..d2fa5c38c6435 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -568,7 +568,8 @@ class SparkSubmitSuite } } - val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar") + val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty) + val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString) val appArgs2 = new SparkSubmitArguments(clArgs2) val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2) assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS)) @@ -1254,7 +1255,7 @@ class SparkSubmitSuite |public void init(PluginContext ctx, Map extraConf) { | String str = null; | try (BufferedReader reader = - | new BufferedReader(new InputStreamReader(new FileInputStream($tempFileName)))) { + | new BufferedReader(new InputStreamReader(new FileInputStream("$tempFileName")))) { | str = reader.readLine(); | } catch (IOException e) { | throw new RuntimeException(e); From 2a47155ee437ece901007693d6abbfe26028b168 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 1 Jul 2020 02:08:12 +0900 Subject: [PATCH 3/8] Extended the timeout to 30 secs. --- .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index d2fa5c38c6435..03f67fd8d2118 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1301,7 +1301,7 @@ class SparkSubmitSuite "--jars", jarUrl.toString, "--files", tempFile.toString, unusedJar.toString) - runSparkSubmit(args, timeout = 10.seconds) + runSparkSubmit(args, timeout = 30.seconds) } private def testRemoteResources( From c5c69d1399e8d59ef59fef2f9c6e84bfb400e6b6 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 6 Jul 2020 02:59:47 +0900 Subject: [PATCH 4/8] Fixed url issue. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 ++ .../main/scala/org/apache/spark/executor/Executor.scala | 9 +++++++-- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 5 +++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 06ab56e48d8bc..e982f0db83a13 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -490,10 +490,12 @@ class SparkContext(config: SparkConf) extends Logging { if (jars != null) { jars.foreach(jar => addJar(jar, true)) } + _conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(",")) if (files != null) { files.foreach(file => addFile(file, false, true)) } + _conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(",")) _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key) .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 73e7e7fd4d77f..e2c47fea47721 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -158,9 +158,14 @@ private[spark] class Executor( // Jars and files specified by spark.jars and spark.files. private val initialUserJars = - Map(Utils.getUserJars(conf).map(jar => (jar, appStartTime)): _*) + conf.getOption("spark.app.initial.jar.urls").map { urls => + Map(urls.split(",").map(url => (url, appStartTime)): _*) + }.getOrElse(Map.empty) + private val initialUserFiles = - Map(Utils.getUserFiles(conf).map(file => (file, appStartTime)): _*) + conf.getOption("spark.app.initial.file.urls").map { urls => + Map(urls.split(",").map(url => (url, appStartTime)): _*) + }.getOrElse(Map.empty) updateDependencies(initialUserFiles, initialUserJars) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 03f67fd8d2118..f7f9c247b1372 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1292,14 +1292,15 @@ class SparkSubmitSuite new java.io.File(tempDir, "testplugin.jar")) val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null) val args = Seq( "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), "--name", "testApp", "--master", "local-cluster[1,1,1024]", "--conf", "spark.plugins=TestSparkPlugin", "--conf", "spark.ui.enabled=false", - "--jars", jarUrl.toString, - "--files", tempFile.toString, + "--jars", jarUrl.toString + "," + unusedJar.toString, + "--files", tempFile.toString + "," + unusedFile.toString, unusedJar.toString) runSparkSubmit(args, timeout = 30.seconds) } From 21720ac0426922b88c653790bf9744a4c6742f98 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 6 Jul 2020 04:49:43 +0900 Subject: [PATCH 5/8] Replaced java.io.File with File simply. --- .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index f7f9c247b1372..9311c1cb13658 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1289,7 +1289,7 @@ class SparkSubmitSuite val jarUrl = TestUtils.createJar( Seq(compiledSparkPlugin, compiledExecutorPlugin), - new java.io.File(tempDir, "testplugin.jar")) + new File(tempDir, "testplugin.jar")) val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null) From 449df2b92e5ad0dac6ea8dd83233450946a39df2 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 14 Jul 2020 13:44:31 +0900 Subject: [PATCH 6/8] Fixed for the empty value. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e982f0db83a13..18ca9bcc57003 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -489,13 +489,17 @@ class SparkContext(config: SparkConf) extends Logging { // Add each JAR given through the constructor if (jars != null) { jars.foreach(jar => addJar(jar, true)) + if (addedJars.nonEmpty) { + _conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(",")) + } } - _conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(",")) if (files != null) { files.foreach(file => addFile(file, false, true)) + if (addedFiles.nonEmpty) { + _conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(",")) + } } - _conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(",")) _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key) .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) From 5d65caf55c7b87fc0035444b60847e4037ad0f40 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 14 Aug 2020 02:10:46 +0900 Subject: [PATCH 7/8] Refector code and expand comment. --- .../org/apache/spark/executor/Executor.scala | 17 ++++++++--------- .../apache/spark/deploy/SparkSubmitSuite.scala | 1 - 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3239be8a34c6d..9a191e1e84fad 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -222,17 +222,16 @@ private[spark] class Executor( private val appStartTime = conf.getLong("spark.app.startTime", 0) - // Jars and files specified by spark.jars and spark.files. - private val initialUserJars = - conf.getOption("spark.app.initial.jar.urls").map { urls => + // To allow users to distribute plugins and their required files + // specified by --jars and --files on application submission, those jars/files should be + // downloaded and added to the class loader via updateDependencies. + // This should be done before plugin initialization below + // because executors search plugins from the class loader and initialize them. + private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key => + conf.getOption(s"spark.app.initial.$key.urls").map { urls => Map(urls.split(",").map(url => (url, appStartTime)): _*) }.getOrElse(Map.empty) - - private val initialUserFiles = - conf.getOption("spark.app.initial.file.urls").map { urls => - Map(urls.split(",").map(url => (url, appStartTime)): _*) - }.getOrElse(Map.empty) - + } updateDependencies(initialUserFiles, initialUserJars) // Plugins need to load using a class loader that includes the executor's user classpath. diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 3d29f7606de7f..b5b3751439750 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1221,7 +1221,6 @@ class SparkSubmitSuite testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("*")) } - test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") { val tempDir = Utils.createTempDir() val tempFileName = "test.txt" From e57d053990e634fc6f03afbea761a9f32eeaee89 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 14 Aug 2020 15:07:40 +0900 Subject: [PATCH 8/8] Removed unused getUserJars method from Utils.scala. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 51eb739baf97d..35d60bb514405 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2598,15 +2598,6 @@ private[spark] object Utils extends Logging { conf.get(JARS).filter(_.nonEmpty) } - /** - * Return the files pointed by the "spark.files" property. Spark internally will distribute - * these files through file server. In the YARN mode, it will return an empty list, since YARN - * has its own mechanism to distribute files. - */ - def getUserFiles(conf: SparkConf): Seq[String] = { - conf.get(FILES).filter(_.nonEmpty) - } - /** * Return the local jar files which will be added to REPL's classpath. These jar files are * specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by