Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars #28939

Closed
wants to merge 11 commits into from
21 changes: 15 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ class SparkContext(config: SparkConf) extends Logging {
try {
_conf = config.clone()
_conf.validateSettings()
_conf.set("spark.app.startTime", startTime.toString)

if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
Expand Down Expand Up @@ -487,11 +488,11 @@ class SparkContext(config: SparkConf) extends Logging {

// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
jars.foreach(jar => addJar(jar, true))
}

if (files != null) {
files.foreach(addFile)
files.foreach(file => addFile(file, false, true))
}

_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
Expand Down Expand Up @@ -1495,7 +1496,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String): Unit = {
addFile(path, false)
addFile(path, false, false)
}

/**
Expand All @@ -1517,6 +1518,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String, recursive: Boolean): Unit = {
addFile(path, recursive, false)
}

private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
val uri = new Path(path).toUri
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
Expand Down Expand Up @@ -1554,7 +1559,7 @@ class SparkContext(config: SparkConf) extends Logging {
path
}
}
val timestamp = System.currentTimeMillis
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
Expand All @@ -1564,7 +1569,7 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
} else {
logWarning(s"The path $path has been added already. Overwriting of added paths " +
"is not supported in the current version.")
"is not supported in the current version.")
}
}

Expand Down Expand Up @@ -1835,6 +1840,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addJar(path: String): Unit = {
addJar(path, false)
}

private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
def addLocalJarFile(file: File): String = {
try {
if (!file.exists()) {
Expand Down Expand Up @@ -1899,7 +1908,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
if (key != null) {
val timestamp = System.currentTimeMillis
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,18 @@ private[spark] object TestUtils {
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq.empty): File = {
classpathUrls: Seq[URL] = Seq.empty,
preClassDefinitionBlock: String = "",
implementsClasses: Seq[String] = Seq.empty,
extraCodeBody: String = ""): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val implementsText = implementsClasses.map(", " + _).mkString
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
preClassDefinitionBlock +
"public class " + className + extendsText + " implements java.io.Serializable" +
implementsText + " {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }" +
extraCodeBody + " }")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ private[spark] class Executor(
// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
env.serializerManager.setDefaultClassLoader(replClassLoader)

private val appStartTime = conf.getLong("spark.app.startTime", 0)

// Jars and files specified by spark.jars and spark.files.
private val initialUserJars =
Map(Utils.getUserJars(conf).map(jar => (jar, appStartTime)): _*)
private val initialUserFiles =
Map(Utils.getUserFiles(conf).map(file => (file, appStartTime)): _*)

updateDependencies(initialUserFiles, initialUserJars)

// Plugins need to load using a class loader that includes the executor's user classpath
private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) {
PluginContainer(env, resources.asJava)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2572,6 +2572,15 @@ private[spark] object Utils extends Logging {
conf.get(JARS).filter(_.nonEmpty)
}

/**
* Return the files pointed by the "spark.files" property. Spark internally will distribute
* these files through file server. In the YARN mode, it will return an empty list, since YARN
* has its own mechanism to distribute files.
*/
def getUserFiles(conf: SparkConf): Seq[String] = {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
conf.get(FILES).filter(_.nonEmpty)
}

/**
* Return the local jar files which will be added to REPL's classpath. These jar files are
* specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,24 @@

import org.apache.spark.api.java.*;
import org.apache.spark.*;
import org.apache.spark.util.Utils;

/**
* Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext.
*/
public class JavaSparkContextSuite implements Serializable {

@Test
public void javaSparkContext() {
public void javaSparkContext() throws IOException {
File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString();
String[] jars = new String[] {};
java.util.Map<String, String> environment = new java.util.HashMap<>();

new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop();
new JavaSparkContext("local", "name", new SparkConf()).stop();
new JavaSparkContext("local", "name").stop();
new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop();
new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop();
new JavaSparkContext("local", "name", "sparkHome", jars).stop();
new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ class SparkSubmitSuite
}
}

val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty)
val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString)
val appArgs2 = new SparkSubmitArguments(clArgs2)
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
Expand Down Expand Up @@ -1218,6 +1219,91 @@ class SparkSubmitSuite
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
}


tgravescs marked this conversation as resolved.
Show resolved Hide resolved
test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") {
val tempDir = Utils.createTempDir()
val tempFileName = "test.txt"
val tempFile = new File(tempDir, tempFileName)

// scalastyle:off println
Utils.tryWithResource {
new PrintWriter(tempFile)
} { writer =>
writer.println("SparkPluginTest")
}
// scalastyle:on println

val importStatements =
"""
|import java.io.*;
|import java.util.Map;
|import org.apache.spark.api.plugin.*;
""".stripMargin
val sparkPluginCodeBody =
"""
|@Override
|public ExecutorPlugin executorPlugin() {
| return new TestExecutorPlugin();
|}
|
|@Override
|public DriverPlugin driverPlugin() { return null; }
""".stripMargin
val executorPluginCodeBody =
s"""
|@Override
|public void init(PluginContext ctx, Map<String, String> extraConf) {
| String str = null;
| try (BufferedReader reader =
| new BufferedReader(new InputStreamReader(new FileInputStream("$tempFileName")))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting. Previously, this test case succeeds with "?

Copy link
Member Author

@sarutak sarutak Jun 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I replaced "test.txt" with $tempFileName just before the first push so, it's a miss-replacement.

| str = reader.readLine();
| } catch (IOException e) {
| throw new RuntimeException(e);
| } finally {
| assert str == "SparkPluginTest";
| }
|}
""".stripMargin

val compiledExecutorPlugin = TestUtils.createCompiledClass(
"TestExecutorPlugin",
tempDir,
"",
null,
Seq.empty,
importStatements,
Seq("ExecutorPlugin"),
executorPluginCodeBody)

val thisClassPath =
sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL)
val compiledSparkPlugin = TestUtils.createCompiledClass(
"TestSparkPlugin",
tempDir,
"",
null,
Seq(tempDir.toURI.toURL) ++ thisClassPath,
importStatements,
Seq("SparkPlugin"),
sparkPluginCodeBody)

val jarUrl = TestUtils.createJar(
Seq(compiledSparkPlugin, compiledExecutorPlugin),
new java.io.File(tempDir, "testplugin.jar"))

val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local-cluster[1,1,1024]",
"--conf", "spark.plugins=TestSparkPlugin",
"--conf", "spark.ui.enabled=false",
"--jars", jarUrl.toString,
"--files", tempFile.toString,
unusedJar.toString)
runSparkSubmit(args, timeout = 10.seconds)
}

private def testRemoteResources(
enableHttpFs: Boolean,
blacklistSchemes: Seq[String] = Nil): Unit = {
Expand Down