Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into python_udf
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Feb 4, 2015
2 parents 7bccc3b + 6aed719 commit 462b334
Show file tree
Hide file tree
Showing 22 changed files with 693 additions and 31 deletions.
6 changes: 3 additions & 3 deletions bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ function gatherSparkSubmitOpts() {
exit 1
fi

# NOTE: If you add or remove spark-sumbmit options,
# NOTE: If you add or remove spark-submit options,
# modify NOT ONLY this script but also SparkSubmitArgument.scala
SUBMISSION_OPTS=()
APPLICATION_OPTS=()
while (($#)); do
case "$1" in
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
--conf | --properties-file | --driver-memory | --driver-java-options | \
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
if [[ $# -lt 2 ]]; then
Expand Down
2 changes: 1 addition & 1 deletion bin/windows-utils.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ SET opts="\<--master\> \<--deploy-mode\> \<--class\> \<--name\> \<--jars\> \<--p
SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--driver-java-options\>"
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"

echo %1 | findstr %opts% >nul
if %ERRORLEVEL% equ 0 (
Expand Down
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
<version>${ivy.version}</version>
</dependency>
<dependency>
<groupId>oro</groupId>
<!-- oro is needed by ivy, but only listed as an optional dependency, so we include it. -->
<artifactId>oro</artifactId>
<version>${oro.version}</version>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
Expand Down
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* '''Note:''' We ensure that the byte array for each record in the resulting RDD
* has the provided record length.
*
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
Expand All @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
val data = br.map{ case (k, v) => v.getBytes}
val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
bytes
}
data
}

Expand Down Expand Up @@ -1224,7 +1231,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
null
}
} else {
env.httpFileServer.addJar(new File(uri.getPath))
try {
env.httpFileServer.addJar(new File(uri.getPath))
} catch {
case exc: FileNotFoundException =>
logError(s"Jar not found at $path")
null
case e: Exception =>
// For now just log an error but allow to go through so spark examples work.
// The spark examples don't really need the jar distributed since its also
// the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
null
}
}
// A JAR file which exists locally on every worker node
case "local" =>
Expand Down
217 changes: 215 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.hadoop.fs.Path

import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
import org.apache.ivy.core.report.ResolveReport
import org.apache.ivy.core.resolve.{IvyNode, ResolveOptions}
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -194,6 +205,18 @@ object SparkSubmit {
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

// Resolve maven dependencies if there are any and add classpath to jars
val resolvedMavenCoordinates =
SparkSubmitUtils.resolveMavenCoordinates(
args.packages, Option(args.repositories), Option(args.ivyRepoPath))
if (!resolvedMavenCoordinates.trim.isEmpty) {
if (args.jars == null || args.jars.trim.isEmpty) {
args.jars = resolvedMavenCoordinates
} else {
args.jars += s",$resolvedMavenCoordinates"
}
}

// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
Expand All @@ -202,6 +225,7 @@ object SparkSubmit {
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -213,6 +237,7 @@ object SparkSubmit {

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -384,8 +409,8 @@ object SparkSubmit {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
println(s"Failed to load main class $childMainClass.")
println("You need to build Spark with -Phive and -Phive-thriftserver.")
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
Expand Down Expand Up @@ -475,6 +500,194 @@ object SparkSubmit {
}
}

/** Provides utility functions to be used inside SparkSubmit. */
private[spark] object SparkSubmitUtils {

// Exposed for testing
private[spark] var printStream = SparkSubmit.printStream

/**
* Represents a Maven Coordinate
* @param groupId the groupId of the coordinate
* @param artifactId the artifactId of the coordinate
* @param version the version of the coordinate
*/
private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String)

/**
* Extracts maven coordinates from a comma-delimited string
* @param coordinates Comma-delimited string of maven coordinates
* @return Sequence of Maven coordinates
*/
private[spark] def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
coordinates.split(",").map { p =>
val splits = p.split(":")
require(splits.length == 3, s"Provided Maven Coordinates must be in the form " +
s"'groupId:artifactId:version'. The coordinate provided is: $p")
require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId cannot be null or " +
s"be whitespace. The groupId provided is: ${splits(0)}")
require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId cannot be null or " +
s"be whitespace. The artifactId provided is: ${splits(1)}")
require(splits(2) != null && splits(2).trim.nonEmpty, s"The version cannot be null or " +
s"be whitespace. The version provided is: ${splits(2)}")
new MavenCoordinate(splits(0), splits(1), splits(2))
}
}

/**
* Extracts maven coordinates from a comma-delimited string
* @param remoteRepos Comma-delimited string of remote repositories
* @return A ChainResolver used by Ivy to search for and resolve dependencies.
*/
private[spark] def createRepoResolvers(remoteRepos: Option[String]): ChainResolver = {
// We need a chain resolver if we want to check multiple repositories
val cr = new ChainResolver
cr.setName("list")

// the biblio resolver resolves POM declared dependencies
val br: IBiblioResolver = new IBiblioResolver
br.setM2compatible(true)
br.setUsepoms(true)
br.setName("central")
cr.add(br)

val repositoryList = remoteRepos.getOrElse("")
// add any other remote repositories other than maven central
if (repositoryList.trim.nonEmpty) {
repositoryList.split(",").zipWithIndex.foreach { case (repo, i) =>
val brr: IBiblioResolver = new IBiblioResolver
brr.setM2compatible(true)
brr.setUsepoms(true)
brr.setRoot(repo)
brr.setName(s"repo-${i + 1}")
cr.add(brr)
printStream.println(s"$repo added as a remote repository with the name: ${brr.getName}")
}
}
cr
}

/**
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
* (will append to jars in SparkSubmit). The name of the jar is given
* after a '!' by Ivy. It also sometimes contains '(bundle)' after '.jar'. Remove that as well.
* @param artifacts Sequence of dependencies that were resolved and retrieved
* @param cacheDirectory directory where jars are cached
* @return a comma-delimited list of paths for the dependencies
*/
private[spark] def resolveDependencyPaths(
artifacts: Array[AnyRef],
cacheDirectory: File): String = {
artifacts.map { artifactInfo =>
val artifactString = artifactInfo.toString
val jarName = artifactString.drop(artifactString.lastIndexOf("!") + 1)
cacheDirectory.getAbsolutePath + File.separator +
jarName.substring(0, jarName.lastIndexOf(".jar") + 4)
}.mkString(",")
}

/** Adds the given maven coordinates to Ivy's module descriptor. */
private[spark] def addDependenciesToIvy(
md: DefaultModuleDescriptor,
artifacts: Seq[MavenCoordinate],
ivyConfName: String): Unit = {
artifacts.foreach { mvn =>
val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, mvn.version)
val dd = new DefaultDependencyDescriptor(ri, false, false)
dd.addDependencyConfiguration(ivyConfName, ivyConfName)
printStream.println(s"${dd.getDependencyId} added as a dependency")
md.addDependency(dd)
}
}

/** A nice function to use in tests as well. Values are dummy strings. */
private[spark] def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", "1.0"))

/**
* Resolves any dependencies that were supplied through maven coordinates
* @param coordinates Comma-delimited string of maven coordinates
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
* @param ivyPath The path to the local ivy repository
* @return The comma-delimited path to the jars of the given maven artifacts including their
* transitive dependencies
*/
private[spark] def resolveMavenCoordinates(
coordinates: String,
remoteRepos: Option[String],
ivyPath: Option[String],
isTest: Boolean = false): String = {
if (coordinates == null || coordinates.trim.isEmpty) {
""
} else {
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(true)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
resolveOptions.setDownload(false)
resolveOptions.setLog(LogOptions.LOG_QUIET)
retrieveOptions.setLog(LogOptions.LOG_QUIET)
} else {
resolveOptions.setDownload(true)
}

// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)

// Add an exclusion rule for Spark
val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", "*"), "*", "*", "*")
val sparkDependencyExcludeRule =
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
sparkDependencyExcludeRule.addConfiguration(ivyConfName)

// Exclude any Spark dependencies, and add all supplied maven artifacts as dependencies
md.addExcludeRule(sparkDependencyExcludeRule)
addDependenciesToIvy(md, artifacts, ivyConfName)

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator + "[artifact](-[classifier]).[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))

resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
}
}
}

/**
* Provides an indirection layer for passing arguments as system properties or flags to
* the user's driver program or to downstream launcher tools.
Expand Down

0 comments on commit 462b334

Please sign in to comment.