Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24377][Spark Submit] make --py-files work in non pyspark application #21420

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,18 +430,15 @@ private[spark] class SparkSubmit extends Logging {
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh @jerryshao why did we remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is duplicated with below code, you can check the original code.

}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
if (localPyFiles != null) {
sparkConf.set("spark.submit.pyFiles", localPyFiles)
}
}

if (localPyFiles != null) {
sparkConf.set("spark.submit.pyFiles", localPyFiles)
}

// In YARN mode for an R app, add the SparkR package archive and the R package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
files = Option(files).orElse(sparkProperties.get("spark.files")).orNull
pyFiles = Option(pyFiles).orElse(sparkProperties.get("spark.submit.pyFiles")).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
ivySettingsPath = sparkProperties.get("spark.jars.ivySettings")
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
Expand Down Expand Up @@ -280,9 +281,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
numExecutors != null && Try(numExecutors.toInt).getOrElse(-1) <= 0) {
error("Number of executors must be a positive number")
}
if (pyFiles != null && !isPython) {
error("--py-files given but primary resource is not a Python script")
}

if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
Expand Down
46 changes: 44 additions & 2 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -771,9 +771,13 @@ class SparkSubmitSuite
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))

// Test remote python files
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
val f4 = File.createTempFile("test-submit-remote-python-files", "", tmpDir)
val pyFile1 = File.createTempFile("file1", ".py", tmpDir)
val pyFile2 = File.createTempFile("file2", ".py", tmpDir)
val writer4 = new PrintWriter(f4)
val remotePyFiles = "hdfs:///tmp/file1.py,hdfs:///tmp/file2.py"
val remotePyFiles = s"s3a://${pyFile1.getAbsolutePath},s3a://${pyFile2.getAbsolutePath}"
writer4.println("spark.submit.pyFiles " + remotePyFiles)
writer4.close()
val clArgs4 = Seq(
Expand All @@ -783,7 +787,7 @@ class SparkSubmitSuite
"hdfs:///tmp/mister.py"
)
val appArgs4 = new SparkSubmitArguments(clArgs4)
val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4)
val (_, _, conf4, _) = submit.prepareSubmitEnvironment(appArgs4, conf = Some(hadoopConf))
// Should not format python path for yarn cluster mode
conf4.get("spark.submit.pyFiles") should be(Utils.resolveURIs(remotePyFiles))
}
Expand Down Expand Up @@ -1093,6 +1097,44 @@ class SparkSubmitSuite
assert(exception.getMessage() === "hello")
}

test("support --py-files/spark.submit.pyFiles in non pyspark application") {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)

val tmpDir = Utils.createTempDir()
val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir)

val args = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
"--master", "yarn",
"--deploy-mode", "client",
"--py-files", s"s3a://${pyFile.getAbsolutePath}",
"spark-internal"
)

val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, conf = Some(hadoopConf))

conf.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
conf.get("spark.submit.pyFiles") should (startWith("/"))

// Verify "spark.submit.pyFiles"
val args1 = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
"--master", "yarn",
"--deploy-mode", "client",
"--conf", s"spark.submit.pyFiles=s3a://${pyFile.getAbsolutePath}",
"spark-internal"
)

val appArgs1 = new SparkSubmitArguments(args1)
val (_, _, conf1, _) = submit.prepareSubmitEnvironment(appArgs1, conf = Some(hadoopConf))

conf1.get(PY_FILES.key) should be (s"s3a://${pyFile.getAbsolutePath}")
conf1.get("spark.submit.pyFiles") should (startWith("/"))
}
}

object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
Expand Down