Skip to content

Commit

Permalink
Merge branch 'master' into flume-python
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jun 19, 2015
2 parents 4762c34 + 47af7c1 commit b8d5551
Show file tree
Hide file tree
Showing 174 changed files with 3,488 additions and 1,165 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -950,3 +950,4 @@ The following components are provided under the MIT License. See project link fo
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
2 changes: 1 addition & 1 deletion R/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=R-unit-tests.log
log4j.appender.file.file=R/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"

private val authOn = sparkConf.getBoolean("spark.authenticate", false)
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
Expand Down Expand Up @@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
cookie
} else {
// user must have set spark.authenticate.secret config
sparkConf.getOption("spark.authenticate.secret") match {
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
sys.env.get(SecurityManager.ENV_AUTH_SECRET)
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
case None => throw new Exception("Error: a secret key must be specified via the " +
"spark.authenticate.secret config")
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
}
}
sCookie
Expand Down Expand Up @@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}

private[spark] object SecurityManager {

val SPARK_AUTH_CONF: String = "spark.authenticate"
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
// This is used to set auth secret to an executor's env variable. It should have the same
// value as SPARK_AUTH_SECERET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
ow.setConf(new Configuration(false))
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging {
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.SerializableJobConf

/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
Expand All @@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
with Serializable {

private val now = new Date()
private val conf = new SerializableWritable(jobConf)
private val conf = new SerializableJobConf(jobConf)

private var jobID = 0
private var splitID = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
* Other objects are passed through without conversion.
*/
private[python] class WritableToJavaConverter(
conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.util.{SerializableConfiguration, Utils}

import scala.util.control.NonFatal

Expand Down Expand Up @@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand Down Expand Up @@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import org.apache.ivy.core.resolve.ResolveOptions
import org.apache.ivy.core.retrieve.RetrieveOptions
import org.apache.ivy.core.settings.IvySettings
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
Expand Down Expand Up @@ -735,8 +736,14 @@ private[spark] object SparkSubmitUtils {
}

/** Path of the local Maven cache. */
private[spark] def m2Path: File = new File(System.getProperty("user.home"),
".m2" + File.separator + "repository" + File.separator)
private[spark] def m2Path: File = {
if (Utils.isTesting) {
// test builds delete the maven cache, and this can cause flakiness
new File("dummy", ".m2" + File.separator + "repository")
} else {
new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
}
}

/**
* Extracts maven coordinates from a comma-delimited string
Expand All @@ -756,12 +763,13 @@ private[spark] object SparkSubmitUtils {
localM2.setName("local-m2-cache")
cr.add(localM2)

val localIvy = new IBiblioResolver
localIvy.setRoot(new File(ivySettings.getDefaultIvyUserDir,
"local" + File.separator).toURI.toString)
val localIvy = new FileSystemResolver
val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
localIvy.setLocal(true)
localIvy.setRepository(new FileRepository(localIvyRoot))
val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s",
"[artifact](-[classifier]).[ext]").mkString(File.separator)
localIvy.setPattern(ivyPattern)
localIvy.addIvyPattern(localIvyRoot.getAbsolutePath + File.separator + ivyPattern)
localIvy.setName("local-ivy-cache")
cr.add(localIvy)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
Expand Down Expand Up @@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Some(res)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
res
} catch {
case e: Exception =>
logError(
Expand Down Expand Up @@ -429,9 +433,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Replays the events in the specified log file and returns information about the associated
* application.
* application. Return `None` if the application ID cannot be located.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
appListener.appId.map { appId =>
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
}
} finally {
logInput.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map

import org.apache.spark.Logging
import org.apache.spark.SecurityManager
import org.apache.spark.deploy.Command
import org.apache.spark.launcher.WorkerCommandBuilder
import org.apache.spark.util.Utils
Expand All @@ -40,12 +41,14 @@ object CommandUtils extends Logging {
*/
def buildProcessBuilder(
command: Command,
securityMgr: SecurityManager,
memory: Int,
sparkHome: String,
substituteArguments: String => String,
classPaths: Seq[String] = Seq[String](),
env: Map[String, String] = sys.env): ProcessBuilder = {
val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
val localCommand = buildLocalCommand(
command, securityMgr, substituteArguments, classPaths, env)
val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
val builder = new ProcessBuilder(commandSeq: _*)
val environment = builder.environment()
Expand All @@ -69,27 +72,34 @@ object CommandUtils extends Logging {
*/
private def buildLocalCommand(
command: Command,
securityMgr: SecurityManager,
substituteArguments: String => String,
classPath: Seq[String] = Seq[String](),
env: Map[String, String]): Command = {
val libraryPathName = Utils.libraryPathEnvName
val libraryPathEntries = command.libraryPathEntries
val cmdLibraryPath = command.environment.get(libraryPathName)

val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
var newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
} else {
command.environment
}

// set auth secret to env variable if needed
if (securityMgr.isAuthenticationEnabled) {
newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey)
}

Command(
command.mainClass,
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
Seq[String](), // library path already captured in environment variable
command.javaOpts)
// filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ private[deploy] class DriverRunner(
}

// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
sparkHome.getAbsolutePath, substituteVariables)
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import akka.actor.ActorRef
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -125,8 +125,8 @@ private[deploy] class ExecutorRunner(
private def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

Expand Down
Loading

0 comments on commit b8d5551

Please sign in to comment.