Skip to content

Commit

Permalink
Fixed for ExecutorPlugin working with Standalone Cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Jun 28, 2020
1 parent 8c44d74 commit 69a9be1
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 9 deletions.
21 changes: 15 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ class SparkContext(config: SparkConf) extends Logging {
try {
_conf = config.clone()
_conf.validateSettings()
_conf.set("spark.app.startTime", startTime.toString)

if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
Expand Down Expand Up @@ -487,11 +488,11 @@ class SparkContext(config: SparkConf) extends Logging {

// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
jars.foreach(jar => addJar(jar, true))
}

if (files != null) {
files.foreach(addFile)
files.foreach(file => addFile(file, false, true))
}

_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
Expand Down Expand Up @@ -1495,7 +1496,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String): Unit = {
addFile(path, false)
addFile(path, false, false)
}

/**
Expand All @@ -1517,6 +1518,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String, recursive: Boolean): Unit = {
addFile(path, recursive, false)
}

private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
val uri = new Path(path).toUri
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
Expand Down Expand Up @@ -1554,7 +1559,7 @@ class SparkContext(config: SparkConf) extends Logging {
path
}
}
val timestamp = System.currentTimeMillis
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
Expand All @@ -1564,7 +1569,7 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
} else {
logWarning(s"The path $path has been added already. Overwriting of added paths " +
"is not supported in the current version.")
"is not supported in the current version.")
}
}

Expand Down Expand Up @@ -1835,6 +1840,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addJar(path: String): Unit = {
addJar(path, false)
}

private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
def addLocalJarFile(file: File): String = {
try {
if (!file.exists()) {
Expand Down Expand Up @@ -1899,7 +1908,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
if (key != null) {
val timestamp = System.currentTimeMillis
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,18 @@ private[spark] object TestUtils {
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq.empty): File = {
classpathUrls: Seq[URL] = Seq.empty,
preClassDefinitionBlock: String = "",
implementsClasses: Seq[String] = Seq.empty,
extraCodeBody: String = ""): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val implementsText = implementsClasses.map(", " + _).mkString
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
preClassDefinitionBlock +
"public class " + className + extendsText + " implements java.io.Serializable" +
implementsText + " {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }" +
extraCodeBody + " }")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ private[spark] class Executor(
// for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
env.serializerManager.setDefaultClassLoader(replClassLoader)

private val appStartTime = conf.getLong("spark.app.startTime", 0)

// Jars and files specified by spark.jars and spark.files.
private val initialUserJars =
Map(Utils.getUserJars(conf).map(jar => (jar, appStartTime)): _*)
private val initialUserFiles =
Map(Utils.getUserFiles(conf).map(file => (file, appStartTime)): _*)

updateDependencies(initialUserFiles, initialUserJars)

// Plugins need to load using a class loader that includes the executor's user classpath
private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) {
PluginContainer(env, resources.asJava)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2572,6 +2572,15 @@ private[spark] object Utils extends Logging {
conf.get(JARS).filter(_.nonEmpty)
}

/**
* Return the files pointed by the "spark.files" property. Spark internally will distribute
* these files through file server. In the YARN mode, it will return an empty list, since YARN
* has its own mechanism to distribute files.
*/
def getUserFiles(conf: SparkConf): Seq[String] = {
conf.get(FILES).filter(_.nonEmpty)
}

/**
* Return the local jar files which will be added to REPL's classpath. These jar files are
* specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
Expand Down
85 changes: 85 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,91 @@ class SparkSubmitSuite
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
}


test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") {
val tempDir = Utils.createTempDir()
val tempFileName = "test.txt"
val tempFile = new File(tempDir, tempFileName)

// scalastyle:off println
Utils.tryWithResource {
new PrintWriter(tempFile)
} { writer =>
writer.println("SparkPluginTest")
}
// scalastyle:on println

val importStatements =
"""
|import java.io.*;
|import java.util.Map;
|import org.apache.spark.api.plugin.*;
""".stripMargin
val sparkPluginCodeBody =
"""
|@Override
|public ExecutorPlugin executorPlugin() {
| return new TestExecutorPlugin();
|}
|
|@Override
|public DriverPlugin driverPlugin() { return null; }
""".stripMargin
val executorPluginCodeBody =
s"""
|@Override
|public void init(PluginContext ctx, Map<String, String> extraConf) {
| String str = null;
| try (BufferedReader reader =
| new BufferedReader(new InputStreamReader(new FileInputStream($tempFileName)))) {
| str = reader.readLine();
| } catch (IOException e) {
| throw new RuntimeException(e);
| } finally {
| assert str == "SparkPluginTest";
| }
|}
""".stripMargin

val compiledExecutorPlugin = TestUtils.createCompiledClass(
"TestExecutorPlugin",
tempDir,
"",
null,
Seq.empty,
importStatements,
Seq("ExecutorPlugin"),
executorPluginCodeBody)

val thisClassPath =
sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL)
val compiledSparkPlugin = TestUtils.createCompiledClass(
"TestSparkPlugin",
tempDir,
"",
null,
Seq(tempDir.toURI.toURL) ++ thisClassPath,
importStatements,
Seq("SparkPlugin"),
sparkPluginCodeBody)

val jarUrl = TestUtils.createJar(
Seq(compiledSparkPlugin, compiledExecutorPlugin),
new java.io.File(tempDir, "testplugin.jar"))

val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
val args = Seq(
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local-cluster[1,1,1024]",
"--conf", "spark.plugins=TestSparkPlugin",
"--conf", "spark.ui.enabled=false",
"--jars", jarUrl.toString,
"--files", tempFile.toString,
unusedJar.toString)
runSparkSubmit(args, timeout = 10.seconds)
}

private def testRemoteResources(
enableHttpFs: Boolean,
blacklistSchemes: Seq[String] = Nil): Unit = {
Expand Down

0 comments on commit 69a9be1

Please sign in to comment.