Skip to content

Commit

Permalink
Merge branch 'master' into tree
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Jul 1, 2014
2 parents be6a88a + 05c3d90 commit 8a2a59c
Show file tree
Hide file tree
Showing 259 changed files with 9,315 additions and 2,706 deletions.
3 changes: 3 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ target
.classpath
.mima-excludes
.generated-mima-excludes
.generated-mima-class-excludes
.generated-mima-member-excludes
.rat-excludes
.*md
derby.log
Expand All @@ -22,6 +24,7 @@ spark-env.sh.template
log4j-defaults.properties
sorttable.js
.*txt
.*json
.*data
.*log
cloudpickle.py
Expand Down
5 changes: 5 additions & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ else
fi

if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2
echo "You need to build spark before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
fi

Expand Down
1 change: 1 addition & 0 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
Expand Down Expand Up @@ -244,6 +250,11 @@
<artifactId>easymockclassextension</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
4 changes: 4 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ span.expand-details {
float: right;
}

pre {
font-size: 0.8em;
}

.stage-details {
max-height: 100px;
overflow-y: auto;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class Accumulable[R, T] (
Accumulators.register(this, false)
}

override def toString = value_.toString
override def toString = if (value_ == null) "null" else value_.toString
}

/**
Expand Down
169 changes: 96 additions & 73 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,106 +19,57 @@ package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel}
import org.apache.spark.storage._

/**
* Spark class responsible for passing RDDs split contents to the BlockManager and making
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Keys of RDD splits that are being computed/loaded. */
/** Keys of RDD partitions that are being computed/loaded. */
private val loading = new HashSet[RDDBlockId]()

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
split: Partition,
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, split.index)
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
case Some(blockResult) =>
// Partition is already materialized, so just return its values
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
logInfo(s"Another thread is loading $key, waiting for it to finish...")
while (loading.contains(key)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Got an exception while waiting for another thread to load $key", e)
}
}
logInfo(s"Finished waiting for $key")
/* See whether someone else has successfully loaded it. The main way this would fail
* is for the RDD-level cache eviction policy if someone else has loaded the same RDD
* partition but we didn't want to make space for it. However, that case is unlikely
* because it's unlikely that two threads would work on the same RDD partition. One
* downside of the current code is that threads wait serially if this does happen. */
blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo(s"Whoever was loading $key failed; we'll try it ourselves")
loading.add(key)
}
} else {
loading.add(key)
}
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}

// Otherwise, we have to load the partition ourselves
try {
// If we got here, we have to load the split
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(split, context)
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// Persist the result, so long as the task is not running locally
// If the task is running locally, do not persist the result
if (context.runningLocally) {
return computedValues
}

// Keep track of blocks with updated statuses
var updatedBlocks = Seq[(BlockId, BlockStatus)]()
val returnValue: Iterator[T] = {
if (storageLevel.useDisk && !storageLevel.useMemory) {
/* In the case that this RDD is to be persisted using DISK_ONLY
* the iterator will be passed directly to the blockManager (rather then
* caching it to an ArrayBuffer first), then the resulting block data iterator
* will be passed back to the user. If the iterator generates a lot of data,
* this means that it doesn't all have to be held in memory at one time.
* This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
* blocks aren't dropped by the block store before enabling that. */
updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(values) =>
values.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new SparkException("Block manager failed to return persisted value")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
}

// Update task metrics to include any blocks whose storage status is updated
val metrics = context.taskMetrics
metrics.updatedBlocks = Some(updatedBlocks)

new InterruptibleIterator(context, returnValue)
// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
context.taskMetrics.updatedBlocks = Some(updatedBlocks)
new InterruptibleIterator(context, cachedValues)

} finally {
loading.synchronized {
Expand All @@ -128,4 +79,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
}
}

/**
* Acquire a loading lock for the partition identified by the given block ID.
*
* If the lock is free, just acquire it and return None. Otherwise, another thread is already
* loading the partition, so we wait for it to finish and return the values loaded by the thread.
*/
private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
loading.synchronized {
if (!loading.contains(id)) {
// If the partition is free, acquire its lock to compute its value
loading.add(id)
None
} else {
// Otherwise, wait for another thread to finish and return its result
logInfo(s"Another thread is loading $id, waiting for it to finish...")
while (loading.contains(id)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Exception while waiting for another thread to load $id", e)
}
}
logInfo(s"Finished waiting for $id")
val values = blockManager.get(id)
if (!values.isDefined) {
/* The block is not guaranteed to exist even after the other thread has finished.
* For instance, the block could be evicted after it was put, but before our get.
* In this case, we still need to load the partition ourselves. */
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}

/**
* Cache the values of a partition, keeping track of any updates in the storage statuses
* of other blocks along the way.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
storageLevel: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {

if (!storageLevel.useMemory) {
/* This RDD is not to be cached in memory, so we can just pass the computed values
* as an iterator directly to the BlockManager, rather than first fully unrolling
* it in memory. The latter option potentially uses much more memory and risks OOM
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back, e.g.
* when the entirety of the RDD does not fit in memory. */
val elements = new ArrayBuffer[Any]
elements ++= values
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
}
}

}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class ShuffleDependency[K, V, C](
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None)
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

val shuffleId: Int = rdd.context.newShuffleId()
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import scala.concurrent.Await

import akka.actor._
import akka.pattern.ask

import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._

Expand Down Expand Up @@ -105,14 +107,17 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
Await.result(future, timeout)
} catch {
case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
throw new SparkException("Error communicating with MapOutputTracker", e)
}
}

/** Send a one-way message to the trackerActor, to which we expect it to reply with true. */
protected def sendTracker(message: Any) {
if (askTracker(message) != true) {
throw new SparkException("Error reply received from MapOutputTracker")
val response = askTracker(message)
if (response != true) {
throw new SparkException(
"Error reply received from MapOutputTracker. Expecting true, got " + response.toString)
}
}

Expand Down Expand Up @@ -168,8 +173,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
} else {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
Expand Down Expand Up @@ -364,15 +369,15 @@ private[spark] object MapOutputTracker {
// any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException.
private def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
assert (statuses != null)
statuses.map {
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
} else {
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
Expand Down Expand Up @@ -401,7 +406,7 @@ private[spark] object MapOutputTracker {
if (compressedSize == 0) {
0
} else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
math.pow(LOG_BASE, compressedSize & 0xFF).toLong
}
}
}
Loading

0 comments on commit 8a2a59c

Please sign in to comment.