Skip to content

Commit

Permalink
Merge pull request #1 from robert3005/fork/master
Browse files Browse the repository at this point in the history
add py-requirements submit option
  • Loading branch information
buckhx committed May 11, 2016
2 parents 82534d0 + ea6b89f commit 1d5d25f
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 2 deletions.
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,19 @@ object SparkSubmit {
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.primaryResource, args.pyRequirements)
}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
args.files = mergeFileLists(args.files, args.pyFiles, args.pyRequirements)
}
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
if (args.pyRequirements != null) {
sysProps("spark.submit.pyRequirements") = args.pyRequirements
}
}

// In YARN mode for an R app, add the SparkR package archive and the R package
Expand Down Expand Up @@ -542,6 +545,10 @@ object SparkSubmit {
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}

if (args.pyRequirements != null) {
sysProps("spark.submit.pyRequirements") = args.pyRequirements
}
}

// assure a keytab is available from any place in a JVM
Expand Down Expand Up @@ -593,6 +600,9 @@ object SparkSubmit {
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
if (args.pyRequirements != null) {
sysProps("spark.submit.pyRequirements") = args.pyRequirements
}
} else {
childArgs += (args.primaryResource, args.mainClass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
var pyRequirements: String = null
var isR: Boolean = false
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
Expand Down Expand Up @@ -304,6 +305,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| numExecutors $numExecutors
| files $files
| pyFiles $pyFiles
| pyRequiremenst $pyRequirements
| archives $archives
| mainClass $mainClass
| primaryResource $primaryResource
Expand Down Expand Up @@ -395,6 +397,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PY_FILES =>
pyFiles = Utils.resolveURIs(value)

case PY_REQUIREMENTS =>
pyRequirements = Utils.resolveURIs(value)

case ARCHIVES =>
archives = Utils.resolveURIs(value)

Expand Down Expand Up @@ -505,6 +510,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --py-requirements REQS Pip requirements file with dependencies that will be fetched
| and placed on PYTHONPATH
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,37 @@ class SparkSubmitSuite
appArgs.executorMemory should be ("2.3g")
}
}

test("py-requirements will be distributed") {
val pyReqs = "requirements.txt"

val clArgsYarn = Seq(
"--master", "yarn",
"--deploy-mode", "cluster",
"--py-requirements", pyReqs,
"mister.py"
)

val appArgsYarn = new SparkSubmitArguments(clArgsYarn)
val sysPropsYarn = SparkSubmit.prepareSubmitEnvironment(appArgsYarn)._3
appArgsYarn.pyRequirements should be (Utils.resolveURIs(pyReqs))
sysPropsYarn("spark.yarn.dist.files") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(","))
sysPropsYarn("spark.submit.pyRequirements") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(","))

val clArgs = Seq(
"--master", "local",
"--py-requirements", pyReqs,
"mister.py"
)

val appArgs = new SparkSubmitArguments(clArgs)
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
appArgs.pyRequirements should be (Utils.resolveURIs(pyReqs))
sysProps("spark.submit.pyRequirements") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(","))
}
// scalastyle:on println

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class SparkSubmitOptionParser {
protected final String PROPERTIES_FILE = "--properties-file";
protected final String PROXY_USER = "--proxy-user";
protected final String PY_FILES = "--py-files";
protected final String PY_REQUIREMENTS = "--py-requirements";
protected final String REPOSITORIES = "--repositories";
protected final String STATUS = "--status";
protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment,
self._python_includes.append(filename)
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))

# Apply requirements file set by spark-submit.
for path in self._conf.get("spark.submit.pyRequirements", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
self.addRequirementsFile(os.path.join(SparkFiles.getRootDirectory(), filename))

# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
Expand Down

0 comments on commit 1d5d25f

Please sign in to comment.