Skip to content

Commit

Permalink
[SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Clust…
Browse files Browse the repository at this point in the history
…er and Kubernetes with --jars

### What changes were proposed in this pull request?

This PR changes Executor to load jars and files added by --jars and --files on Executor initialization.
To avoid downloading those jars/files twice, they are assosiated with `startTime` as their uploaded timestamp.

### Why are the changes needed?

ExecutorPlugin can't work with Standalone Cluster and Kubernetes
when a jar which contains plugins and files used by the plugins are added by --jars and --files option with spark-submit.

This is because jars and files added by --jars and --files are not loaded on Executor initialization.
I confirmed it works with YARN because jars/files are distributed as distributed cache.

### Does this PR introduce _any_ user-facing change?

Yes. jars/files added by --jars and --files are downloaded on each executor on initialization.

### How was this patch tested?

Added a new testcase.

Closes apache#28939 from sarutak/fix-plugin-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
sarutak authored and Mridul Muralidharan committed Aug 14, 2020
1 parent c6be207 commit 1a4c8f7
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 9 deletions.
27 changes: 21 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ class SparkContext(config: SparkConf) extends Logging {
try {
_conf = config.clone()
_conf.validateSettings()
_conf.set("spark.app.startTime", startTime.toString)

if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
Expand Down Expand Up @@ -492,11 +493,17 @@ class SparkContext(config: SparkConf) extends Logging {

// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
jars.foreach(jar => addJar(jar, true))
if (addedJars.nonEmpty) {
_conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(","))
}
}

if (files != null) {
files.foreach(addFile)
files.foreach(file => addFile(file, false, true))
if (addedFiles.nonEmpty) {
_conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(","))
}
}

_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
Expand Down Expand Up @@ -1500,7 +1507,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String): Unit = {
addFile(path, false)
addFile(path, false, false)
}

/**
Expand All @@ -1522,6 +1529,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String, recursive: Boolean): Unit = {
addFile(path, recursive, false)
}

private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
val uri = new Path(path).toUri
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
Expand Down Expand Up @@ -1559,7 +1570,7 @@ class SparkContext(config: SparkConf) extends Logging {
path
}
}
val timestamp = System.currentTimeMillis
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
Expand All @@ -1569,7 +1580,7 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
} else {
logWarning(s"The path $path has been added already. Overwriting of added paths " +
"is not supported in the current version.")
"is not supported in the current version.")
}
}

Expand Down Expand Up @@ -1840,6 +1851,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addJar(path: String): Unit = {
addJar(path, false)
}

private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
def addLocalJarFile(file: File): String = {
try {
if (!file.exists()) {
Expand Down Expand Up @@ -1904,7 +1919,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
if (key != null) {
val timestamp = System.currentTimeMillis
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,20 @@ private[spark] class Executor(

heartbeater.start()

private val appStartTime = conf.getLong("spark.app.startTime", 0)

// To allow users to distribute plugins and their required files
// specified by --jars and --files on application submission, those jars/files should be
// downloaded and added to the class loader via updateDependencies.
// This should be done before plugin initialization below
// because executors search plugins from the class loader and initialize them.
private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key =>
conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
Map(urls.split(",").map(url => (url, appStartTime)): _*)
}.getOrElse(Map.empty)
}
updateDependencies(initialUserFiles, initialUserJars)

// Plugins need to load using a class loader that includes the executor's user classpath.
// Plugins also needs to be initialized after the heartbeater started
// to avoid blocking to send heartbeat (see SPARK-32175).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,24 @@

import org.apache.spark.api.java.*;
import org.apache.spark.*;
import org.apache.spark.util.Utils;

/**
* Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext.
*/
public class JavaSparkContextSuite implements Serializable {

@Test
public void javaSparkContext() {
public void javaSparkContext() throws IOException {
File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString();
String[] jars = new String[] {};
java.util.Map<String, String> environment = new java.util.HashMap<>();

new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop();
new JavaSparkContext("local", "name", new SparkConf()).stop();
new JavaSparkContext("local", "name").stop();
new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop();
new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop();
new JavaSparkContext("local", "name", "sparkHome", jars).stop();
new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ class SparkSubmitSuite
}
}

val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty)
val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString)
val appArgs2 = new SparkSubmitArguments(clArgs2)
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
Expand Down Expand Up @@ -1220,6 +1221,86 @@ class SparkSubmitSuite
testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("*"))
}

test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") {
val tempDir = Utils.createTempDir()
val tempFileName = "test.txt"
val tempFile = new File(tempDir, tempFileName)

// scalastyle:off println
Utils.tryWithResource {
new PrintWriter(tempFile)
} { writer =>
writer.println("SparkPluginTest")
}
// scalastyle:on println

val sparkPluginCodeBody =
"""
|@Override
|public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
| return new TestExecutorPlugin();
|}
|
|@Override
|public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; }
""".stripMargin
val executorPluginCodeBody =
s"""
|@Override
|public void init(
| org.apache.spark.api.plugin.PluginContext ctx,
| java.util.Map<String, String> extraConf) {
| String str = null;
| try (java.io.BufferedReader reader =
| new java.io.BufferedReader(new java.io.InputStreamReader(
| new java.io.FileInputStream("$tempFileName")))) {
| str = reader.readLine();
| } catch (java.io.IOException e) {
| throw new RuntimeException(e);
| } finally {
| assert str == "SparkPluginTest";
| }
|}
""".stripMargin

val compiledExecutorPlugin = TestUtils.createCompiledClass(
"TestExecutorPlugin",
tempDir,
"",
null,
Seq.empty,
Seq("org.apache.spark.api.plugin.ExecutorPlugin"),
executorPluginCodeBody)

val thisClassPath =
sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL)
val compiledSparkPlugin = TestUtils.createCompiledClass(
"TestSparkPlugin",
tempDir,
"",
null,
Seq(tempDir.toURI.toURL) ++ thisClassPath,
Seq("org.apache.spark.api.plugin.SparkPlugin"),
sparkPluginCodeBody)

val jarUrl = TestUtils.createJar(
Seq(compiledSparkPlugin, compiledExecutorPlugin),
new File(tempDir, "testplugin.jar"))

val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local-cluster[1,1,1024]",
"--conf", "spark.plugins=TestSparkPlugin",
"--conf", "spark.ui.enabled=false",
"--jars", jarUrl.toString + "," + unusedJar.toString,
"--files", tempFile.toString + "," + unusedFile.toString,
unusedJar.toString)
runSparkSubmit(args)
}

private def testRemoteResources(
enableHttpFs: Boolean,
forceDownloadSchemes: Seq[String] = Nil): Unit = {
Expand Down

0 comments on commit 1a4c8f7

Please sign in to comment.