Skip to content

Commit

Permalink
take path to ivy cache as a conf
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 27, 2015
1 parent 2edc9b5 commit c04d885
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 29 deletions.
3 changes: 2 additions & 1 deletion bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() {
--master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \
--conf | --maven-repos | --properties-file | --driver-memory | --driver-java-options | \
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
--total-executor-cores | --executor-cores | --queue | --num-executors | \
--archives | --ivy-repo )
if [[ $# -lt 2 ]]; then
"$SUBMIT_USAGE_FUNCTION"
exit 1;
Expand Down
2 changes: 1 addition & 1 deletion bin/windows-utils.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--maven\> \<--maven-repos\> \<--ivy-repo\>"

echo %1 | findstr %opts% >nul
if %ERRORLEVEL% equ 0 (
Expand Down
66 changes: 39 additions & 27 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.lang.reflect.{Modifier, InvocationTargetException}
import java.net.URL

import org.apache.ivy.Ivy
import org.apache.ivy.ant.IvyDependencyExclude
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
Expand Down Expand Up @@ -69,14 +68,15 @@ object SparkSubmit {

// Directories for caching downloads through ivy and storing the jars when maven coordinates are
// supplied to spark-submit
// TODO: Take these as arguments? For example, on AWS /mnt/ is a better location.
private val IVY_CACHE = new File("ivy/cache")
private val MAVEN_JARS = new File("ivy/jars")
private var IVY_CACHE: File = null
private var MAVEN_JARS: File = null

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
private[spark] var printStream: PrintStream = System.err

private[spark] def printWarning(str: String) = printStream.println("Warning: " + str)

private[spark] def printErrorAndExit(str: String) = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
Expand All @@ -94,13 +94,13 @@ object SparkSubmit {

/**
* @return a tuple containing
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a list of system properties and env vars, and
* (4) the main class for the child
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a list of system properties and env vars, and
* (4) the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {

// Values to return
val childArgs = new ArrayBuffer[String]()
Expand Down Expand Up @@ -147,7 +147,7 @@ object SparkSubmit {
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
printErrorAndExit(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
"This copy of Spark may not have been compiled with YARN support.")
}
}

Expand Down Expand Up @@ -186,6 +186,8 @@ object SparkSubmit {
sysProps("SPARK_SUBMIT") = "true"

// Resolve maven dependencies if there are any and add classpath to jars
IVY_CACHE = new File(s"${args.ivyRepoPath}/cache")
MAVEN_JARS = new File(s"${args.ivyRepoPath}/jars")
val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.mavenRepos)
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
Expand All @@ -203,6 +205,7 @@ object SparkSubmit {
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -214,6 +217,7 @@ object SparkSubmit {

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -252,18 +256,26 @@ object SparkSubmit {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
if (args.childArgs != null) { childArgs ++= args.childArgs }
if (args.jars != null) {
childClasspath ++= args.jars.split(",")
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}


// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) {
childArgs +=(opt.clOption, opt.value)
}
if (opt.sysProp != null) {
sysProps.put(opt.sysProp, opt.value)
}
}
}

Expand All @@ -286,7 +298,7 @@ object SparkSubmit {
childArgs += "--supervise"
}
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
childArgs +=(args.master, args.primaryResource, args.mainClass)
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
Expand All @@ -296,11 +308,11 @@ object SparkSubmit {
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
childArgs +=("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
childArgs +=("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
args.childArgs.foreach { arg => childArgs +=("--arg", arg)}
}
}

Expand Down Expand Up @@ -341,11 +353,11 @@ object SparkSubmit {
}

private def launch(
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
childArgs: ArrayBuffer[String],
childClasspath: ArrayBuffer[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
Expand Down Expand Up @@ -452,8 +464,8 @@ object SparkSubmit {
*/
private[spark] def mergeFileLists(lists: String*): String = {
val merged = lists.filter(_ != null)
.flatMap(_.split(","))
.mkString(",")
.flatMap(_.split(","))
.mkString(",")
if (merged == "") null else merged
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var jars: String = null
var maven: String = null
var mavenRepos: String = null
var ivyRepoPath: String = null
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
Expand Down Expand Up @@ -125,6 +126,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
.orNull
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
ivyRepoPath = Option(ivyRepoPath)
.orElse(sparkProperties.get("spark.jars.ivy")).orElse(Option("ivy/")).get
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
Expand Down Expand Up @@ -228,6 +231,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| jars $jars
| maven $maven
| maven-repos $mavenRepos
| ivy-repo $ivyRepoPath
| verbose $verbose
|
|Spark properties used, including those specified through
Expand Down Expand Up @@ -342,6 +346,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
mavenRepos = value
parse(tail)

case ("--ivy-repo") :: value :: tail =>
ivyRepoPath = Utils.resolveURIs(value)
parse(tail)

case ("--conf" | "-c") :: value :: tail =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
Expand Down Expand Up @@ -399,6 +407,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| coordinates should be groupId:artifactId:version.
| --maven-repos Supply additional remote repositories as a comma-delimited
| list to search for the maven coordinates given with --maven.
| --ivy-repo The path to use to cache jars downloaded using maven
| coordinates. The default is $PWD/ivy
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
Expand Down

0 comments on commit c04d885

Please sign in to comment.