Skip to content

Commit

Permalink
[SPARK-12895][SPARK-12896] Migrate TaskMetrics to accumulators
Browse files Browse the repository at this point in the history
The high level idea is that instead of having the executors send both accumulator updates and TaskMetrics, we should have them send only accumulator updates. This eliminates the need to maintain both code paths since one can be implemented in terms of the other. This effort is split into two parts:

**SPARK-12895: Implement TaskMetrics using accumulators.** TaskMetrics is basically just a bunch of accumulable fields. This patch makes TaskMetrics a syntactic wrapper around a collection of accumulators so we don't need to send TaskMetrics from the executors to the driver.

**SPARK-12896: Send only accumulator updates to the driver.** Now that TaskMetrics are expressed in terms of accumulators, we can capture all TaskMetrics values if we just send accumulator updates from the executors to the driver. This completes the parent issue SPARK-10620.

While an effort has been made to preserve as much of the public API as possible, there were a few known breaking DeveloperApi changes that would be very awkward to maintain. I will gather the full list shortly and post it here.

Note: This was once part of #10717. This patch is split out into its own patch from there to make it easier for others to review. Other smaller pieces of already been merged into master.

Author: Andrew Or <andrew@databricks.com>

Closes #10835 from andrewor14/task-metrics-use-accums.
  • Loading branch information
Andrew Or authored and JoshRosen committed Jan 27, 2016
1 parent edd4737 commit 87abcf7
Show file tree
Hide file tree
Showing 70 changed files with 3,012 additions and 1,141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
@Override
public Option<MapStatus> stop(boolean success) {
try {
// Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
Map<String, Accumulator<Object>> internalAccumulators =
taskContext.internalMetricsToAccumulators();
if (internalAccumulators != null) {
internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
.add(getPeakMemoryUsedBytes());
}
taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes());

if (stopping) {
return Option.apply(null);
Expand Down
86 changes: 63 additions & 23 deletions core/src/main/scala/org/apache/spark/Accumulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,67 @@ import org.apache.spark.util.Utils
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
* All accumulators created on the driver to be used on the executors must be registered with
* [[Accumulators]]. This is already done automatically for accumulators created by the user.
* Internal accumulators must be explicitly registered by the caller.
*
* Operations are not thread-safe.
*
* @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
* thread safe so that they can be reported correctly.
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true
* for system and time metrics like serialization time or bytes spilled,
* and false for things with absolute values like number of input rows.
* This should be used for internal metrics only.
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
initialValue: R,
class Accumulable[R, T] private (
val id: Long,
@transient initialValue: R,

This comment has been minimized.

Copy link
@yhuai

yhuai Jan 27, 2016

Contributor

Seems this needs to be @transient private val.

This comment has been minimized.

Copy link
@JoshRosen

JoshRosen Jan 27, 2016

Contributor

Yep, let's hotfix. BTW, I've opened #10608 to make 2.11 the default build.

This comment has been minimized.

Copy link
@andrewor14

andrewor14 Jan 27, 2016

Contributor
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
internal: Boolean,
private[spark] val countFailedValues: Boolean)
extends Serializable {

private[spark] def this(
@transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = {
this(initialValue, param, None, internal)
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
internal: Boolean,
countFailedValues: Boolean) = {
this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
}

def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, false)
private[spark] def this(
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
internal: Boolean) = {
this(initialValue, param, name, internal, false /* countFailedValues */)
}

def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)
def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, false /* internal */)

val id: Long = Accumulators.newId
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)

@volatile @transient private var value_ : R = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
@volatile @transient private var value_ : R = initialValue // Current value on driver
val zero = param.zero(initialValue) // Zero value to be passed to executors
private var deserialized = false

Accumulators.register(this)
// In many places we create internal accumulators without access to the active context cleaner,
// so if we register them here then we may never unregister these accumulators. To avoid memory
// leaks, we require the caller to explicitly register internal accumulators elsewhere.
if (!internal) {
Accumulators.register(this)
}

/**
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
Expand All @@ -77,6 +104,17 @@ class Accumulable[R, T] private[spark] (
*/
private[spark] def isInternal: Boolean = internal

/**
* Return a copy of this [[Accumulable]].
*
* The copy will have the same ID as the original and will not be registered with
* [[Accumulators]] again. This method exists so that the caller can avoid passing the
* same mutable instance around.
*/
private[spark] def copy(): Accumulable[R, T] = {
new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
}

/**
* Add more data to this accumulator / accumulable
* @param term the data to add
Expand Down Expand Up @@ -106,7 +144,7 @@ class Accumulable[R, T] private[spark] (
def merge(term: R) { value_ = param.addInPlace(value_, term)}

/**
* Access the accumulator's current value; only allowed on master.
* Access the accumulator's current value; only allowed on driver.
*/
def value: R = {
if (!deserialized) {
Expand All @@ -128,7 +166,7 @@ class Accumulable[R, T] private[spark] (
def localValue: R = value_

/**
* Set the accumulator's value; only allowed on master.
* Set the accumulator's value; only allowed on driver.
*/
def value_= (newValue: R) {
if (!deserialized) {
Expand All @@ -139,22 +177,24 @@ class Accumulable[R, T] private[spark] (
}

/**
* Set the accumulator's value; only allowed on master
* Set the accumulator's value. For internal use only.
*/
def setValue(newValue: R) {
this.value = newValue
}
def setValue(newValue: R): Unit = { value_ = newValue }

/**
* Set the accumulator's value. For internal use only.
*/
private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }

// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true

// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
// This is for external accumulators and internal ones that do not represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
Expand Down
101 changes: 74 additions & 27 deletions core/src/main/scala/org/apache/spark/Accumulator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.spark

import scala.collection.{mutable, Map}
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.ref.WeakReference

import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
Expand Down Expand Up @@ -49,14 +54,18 @@ import scala.ref.WeakReference
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @param name human-readable name associated with this accumulator
* @param internal whether this accumulator is used internally within Spark only
* @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {
internal: Boolean,
override val countFailedValues: Boolean = false)
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
Expand All @@ -75,43 +84,63 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
*/
@GuardedBy("Accumulators")
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()

private var lastId: Long = 0
private val nextId = new AtomicLong(0L)

def newId(): Long = synchronized {
lastId += 1
lastId
}
/**
* Return a globally unique ID for a new [[Accumulable]].
* Note: Once you copy the [[Accumulable]] the ID is no longer unique.
*/
def newId(): Long = nextId.getAndIncrement

/**
* Register an [[Accumulable]] created on the driver such that it can be used on the executors.
*
* All accumulators registered here can later be used as a container for accumulating partial
* values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
* Note: if an accumulator is registered here, it should also be registered with the active
* context cleaner for cleanup so as to avoid memory leaks.
*
* If an [[Accumulable]] with the same ID was already registered, this does nothing instead
* of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct
* [[org.apache.spark.executor.TaskMetrics]] from accumulator updates.
*/
def register(a: Accumulable[_, _]): Unit = synchronized {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
if (!originals.contains(a.id)) {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}
}

def remove(accId: Long) {
synchronized {
originals.remove(accId)
}
/**
* Unregister the [[Accumulable]] with the given ID, if any.
*/
def remove(accId: Long): Unit = synchronized {
originals.remove(accId)
}

// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
/**
* Return the [[Accumulable]] registered with the given ID, if any.
*/
def get(id: Long): Option[Accumulable[_, _]] = synchronized {
originals.get(id).map { weakRef =>
// Since we are storing weak references, we must check whether the underlying data is valid.
weakRef.get.getOrElse {
throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id")
}
}
}

/**
* Clear all registered [[Accumulable]]s. For testing only.
*/
def clear(): Unit = synchronized {
originals.clear()
}

}


Expand Down Expand Up @@ -156,5 +185,23 @@ object AccumulatorParam {
def zero(initialValue: Float): Float = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
// Note: when merging values, this param just adopts the newer value. This is used only
// internally for things that shouldn't really be accumulated across tasks, like input
// read method, which should be the same across all tasks in the same stage.
private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
def addInPlace(t1: String, t2: String): String = t2
def zero(initialValue: String): String = ""
}

// Note: this is expensive as it makes a copy of the list every time the caller adds an item.
// A better way to use this is to first accumulate the values yourself then them all at once.
private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
}

// For the internal metric that records what blocks are updated in a particular task
private[spark] object UpdatedBlockStatusesAccumulatorParam
extends ListAccumulatorParam[(BlockId, BlockStatus)]

}
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ case class Aggregator[K, V, C] (
Option(context).foreach { c =>
c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
c.internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
}
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates
blockManagerId: BlockManagerId)

/**
Expand Down Expand Up @@ -119,14 +119,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
executorId, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down

0 comments on commit 87abcf7

Please sign in to comment.