Skip to content

Commit

Permalink
managed to exclude Spark and it's dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed Jan 27, 2015
1 parent a0870af commit 2edc9b5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 18 deletions.
2 changes: 1 addition & 1 deletion bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function gatherSparkSubmitOpts() {
while (($#)); do
case "$1" in
--master | --deploy-mode | --class | --name | --jars | --maven | --py-files | --files | \
--conf | --maven_repos | --properties-file | --driver-memory | --driver-java-options | \
--conf | --maven-repos | --properties-file | --driver-memory | --driver-java-options | \
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
if [[ $# -lt 2 ]]; then
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
null
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
try {
env.httpFileServer.addJar(new File(uri.getPath))
} catch {
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
null
case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
}
}
// A JAR file which exists locally on every worker node
case "local" =>
Expand Down
42 changes: 33 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import java.lang.reflect.{Modifier, InvocationTargetException}
import java.net.URL

import org.apache.ivy.Ivy
import org.apache.ivy.core.module.descriptor.{DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.ModuleRevisionId
import org.apache.ivy.ant.IvyDependencyExclude
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.resolver.IBiblioResolver
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

Expand Down Expand Up @@ -184,7 +186,7 @@ object SparkSubmit {
sysProps("SPARK_SUBMIT") = "true"

// Resolve maven dependencies if there are any and add classpath to jars
val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.maven_repos)
val resolvedMavenCoordinates = resolveMavenCoordinates(args.maven, args.mavenRepos)
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
Expand Down Expand Up @@ -475,24 +477,41 @@ object SparkSubmit {
// create an ivy instance
val ivySettings: IvySettings = new IvySettings
ivySettings.setDefaultCache(IVY_CACHE)
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)

// the biblio resolver resolves POM declared dependencies
val br: IBiblioResolver = new IBiblioResolver
br.setM2compatible(true)
br.setUsepoms(true)
br.setName("central")
ivySettings.addResolver(br)

// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
cr.setName("list")
cr.add(br)

// Add an exclusion rule for Spark
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration("default")

// add any other remote repositories other than maven central
if (remoteRepos != null && !remoteRepos.trim.isEmpty) {
var i = 1
remoteRepos.split(",").foreach { repo =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
ivySettings.addResolver(brr)
brr.setName(s"repo-$i")
cr.add(brr)
i += 1
}
}
ivySettings.setDefaultResolver(br.getName)
ivySettings.addResolver(cr)
ivySettings.setDefaultResolver(cr.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
Expand All @@ -503,12 +522,15 @@ object SparkSubmit {
val md = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-envelope", "1.0"))

md.addExcludeRule(sparkDependencyExcludeRule)

artifacts.foreach { mvn =>
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
val dd = new DefaultDependencyDescriptor(ri, false, false)
dd.addDependencyConfiguration("default", "default")
md.addDependency(dd)
}

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, ro)
if (rr.hasError) {
Expand All @@ -521,10 +543,12 @@ object SparkSubmit {
new RetrieveOptions().setConfs(Array("default")))

// output downloaded jars to classpath (will append to jars). The name of the jar is given
// after a `!` by Ivy.
// after a `!` by Ivy. It also sometimes contains (bundle) after .jar. Remove that as well.
rr.getArtifacts.toArray.map { case artifactInfo =>
val artifactString = artifactInfo.toString
MAVEN_JARS.getAbsolutePath + "/" + artifactString.drop(artifactString.lastIndexOf("!") + 1)
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
MAVEN_JARS.getAbsolutePath + "/" +
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
}.mkString(",")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var maven: String = null
var maven_repos: String = null
var mavenRepos: String = null
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
Expand Down Expand Up @@ -227,7 +227,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| childArgs [${childArgs.mkString(" ")}]
| jars $jars
| maven $maven
| maven_repos $maven_repos
| maven-repos $mavenRepos
| verbose $verbose
|
|Spark properties used, including those specified through
Expand Down Expand Up @@ -338,8 +338,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
maven = value
parse(tail)

case ("--maven_repos") :: value :: tail =>
maven_repos = value
case ("--maven-repos") :: value :: tail =>
mavenRepos = value
parse(tail)

case ("--conf" | "-c") :: value :: tail =>
Expand Down Expand Up @@ -395,9 +395,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --maven Comma-separated list of maven coordinates of jars to include
| on the driver and executor classpaths. Will search the local
| maven repo, then maven central and any additional remote
| repositories given by --maven_repos.
| --maven_repos Supply additional remote repositories to search for the
| maven coordinates given with --maven.
| repositories given by --maven-repos. The format for the
| coordinates should be groupId:artifactId:version.
| --maven-repos Supply additional remote repositories as a comma-delimited
| list to search for the maven coordinates given with --maven.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
Expand Down

0 comments on commit 2edc9b5

Please sign in to comment.