Skip to content

Commit

Permalink
Merge pull request apache#10 from apache/master
Browse files Browse the repository at this point in the history
Update
  • Loading branch information
YanTangZhai committed Nov 11, 2014
2 parents d26d982 + c764d0a commit e249846
Show file tree
Hide file tree
Showing 220 changed files with 6,998 additions and 1,493 deletions.
21 changes: 20 additions & 1 deletion LICENSE
Expand Up @@ -754,7 +754,7 @@ SUCH DAMAGE.


========================================================================
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
========================================================================
Copyright (C) 2008 The Android Open Source Project

Expand All @@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
limitations under the License.


========================================================================
For LimitedInputStream
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
========================================================================
Copyright (C) 2007 The Guava Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


========================================================================
BSD-style licenses
========================================================================
Expand Down
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -13,7 +13,8 @@ and Spark Streaming for stream processing.
## Online Documentation

You can find the latest Spark documentation, including a programming
guide, on the [project web page](http://spark.apache.org/documentation.html).
guide, on the [project web page](http://spark.apache.org/documentation.html)
and [project wiki](https://cwiki.apache.org/confluence/display/SPARK).
This README file only contains basic setup instructions.

## Building Spark
Expand Down
Expand Up @@ -39,6 +39,8 @@ $(function() {
var column = "table ." + $(this).attr("name");
$(column).hide();
});
// Stripe table rows after rows have been hidden to ensure correct striping.
stripeTables();

$("input:checkbox").click(function() {
var column = "table ." + $(this).attr("name");
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/resources/org/apache/spark/ui/static/table.js
Expand Up @@ -28,8 +28,3 @@ function stripeTables() {
});
});
}

/* Stripe all tables after pages finish loading. */
$(function() {
stripeTables();
});
14 changes: 14 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Expand Up @@ -120,6 +120,20 @@ pre {
border: none;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
margin: 0;
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
}

.stacktrace-details.collapsed {
max-height: 0;
padding-top: 0;
padding-bottom: 0;
border: none;
}

span.expand-additional-metrics {
cursor: pointer;
}
Expand Down
Expand Up @@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
verifyBounds()

// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
Expand All @@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)

// How long an executor must be idle for before it is removed
private val removeThresholdSeconds = conf.getLong(
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

validateSettings()

// Number of executors to add in the next round
private var numExecutorsToAdd = 1

Expand All @@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Polling loop interval (ms)
private val intervalMillis: Long = 100

// Whether we are testing this class. This should only be used internally.
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock

/**
* Verify that the lower and upper bounds on the number of executors are valid.
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
*/
private def verifyBounds(): Unit = {
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
}
Expand All @@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
}
if (schedulerBacklogTimeout <= 0) {
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
}
if (sustainedSchedulerBacklogTimeout <= 0) {
throw new SparkException(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
}
if (executorIdleTimeout <= 0) {
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
}

/**
Expand Down Expand Up @@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
if (removeRequestAcknowledged) {
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
true
} else {
Expand Down Expand Up @@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
private def onExecutorIdle(executorId: String): Unit = synchronized {
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
}
}

Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Expand Up @@ -22,6 +22,7 @@ import java.net.{Authenticator, PasswordAuthentication}
import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.sasl.SecretKeyHolder

/**
* Spark class responsible for security.
Expand Down Expand Up @@ -84,7 +85,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* Authenticator installed in the SecurityManager to how it does the authentication
* and in this case gets the user name and password from the request.
*
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
* - BlockTransferService -> The Spark BlockTransferServices uses java nio to asynchronously
* exchange messages. For this we use the Java SASL
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* as the authentication mechanism. This means the shared secret is not passed
Expand All @@ -98,7 +99,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the connectionManager does asynchronous messages passing, the SASL
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
Expand All @@ -107,6 +108,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
* and waits for the response from the server and does the handshake before sending
* the real message.
*
* The NettyBlockTransferService ensures that SASL authentication is performed
* synchronously prior to any other communication on a connection. This is done in
* SaslClientBootstrap on the client side and SaslRpcHandler on the server side.
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
Expand Down Expand Up @@ -139,7 +144,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* can take place.
*/

private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
Expand Down Expand Up @@ -337,4 +342,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
* @return the secret key as a String if authentication is enabled, otherwise returns null
*/
def getSecretKey(): String = secretKey

// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Expand Up @@ -217,6 +217,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
*/
getAll.filter { case (k, _) => isAkkaConf(k) }

/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
*/
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -313,6 +313,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
val applicationId: String = taskScheduler.applicationId()
conf.set("spark.app.id", applicationId)

env.blockManager.initialize(applicationId)

val metricsSystem = env.metricsSystem

// The metrics system for Driver need to be set spark.app.id to app ID.
Expand Down Expand Up @@ -558,6 +560,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {


/**
* :: Experimental ::
*
* Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
* (useful for binary data)
*
Expand Down Expand Up @@ -600,6 +604,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
}

/**
* :: Experimental ::
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
* @param path Directory to the input data files
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Expand Up @@ -276,7 +276,7 @@ object SparkEnv extends Logging {
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
new NettyBlockTransferService(conf, securityManager)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}
Expand All @@ -285,8 +285,9 @@ object SparkEnv extends Logging {
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD

/**
Expand Down

0 comments on commit e249846

Please sign in to comment.