Skip to content

SPARK-732: eliminate duplicate update of the accumulator #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.generic.Growable
import scala.collection.mutable.Map

import org.apache.spark.serializer.JavaSerializer


/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
Expand Down Expand Up @@ -237,12 +239,8 @@ private object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
var lastId: Long = 0

def newId: Long = synchronized {
lastId += 1
lastId
}
private val nextAccumID = new AtomicLong(0)
def newId(): Long = nextAccumID.getAndIncrement()

def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) {
Expand Down Expand Up @@ -270,11 +268,9 @@ private object Accumulators {
}

// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
}
def add(value: (Long, Any)): Unit = synchronized {
if (originals.contains(value._1)) {
originals(value._1).asInstanceOf[Accumulable[Any, Any]] ++= value._2
}
}
}
107 changes: 69 additions & 38 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, ListBuffer}
import scala.concurrent.duration._
import scala.reflect.ClassTag

Expand Down Expand Up @@ -116,6 +116,10 @@ class DAGScheduler(
private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)

// stageId -> (splitId -> (acumulatorId, accumulatorValue))
private[scheduler] val stageIdToAccumulators = new HashMap[Int,
HashMap[Int, ListBuffer[(Long, Any)]]]

taskScheduler.setDAGScheduler(this)

/**
Expand Down Expand Up @@ -344,6 +348,45 @@ class DAGScheduler(
updateJobIdStageIdMapsList(List(stage))
}

private def removeStage(stageId: Int) {
// data structures based on Stage
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
stageToInfos -= stage
for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
shuffleToMapStage.remove(k)
}
if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
logDebug("Removing pending status for stage %d".format(stageId))
}
pendingTasks -= stage
if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
waitingStages -= stage
}
if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
failedStages -= stage
}
}
// data structures based on StageId
stageIdToStage -= stageId
stageIdToJobIds -= stageId
// accumulate acc values, if the stage is aborted, its accumulators
// will not be calculated, since we have removed it in abortStage()
for (partitionIdToAccum <- stageIdToAccumulators.get(stageId);
accumulators <- partitionIdToAccum.values;
accum <- accumulators) {
Accumulators.add(accum)
}
stageIdToAccumulators -= stageId
logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}

/**
* Removes job and any stages that are not needed by any other job. Returns the set of ids for
* stages that were removed. The associated tasks for those stages need to be cancelled if we
Expand All @@ -362,38 +405,6 @@ class DAGScheduler(
"Job %d not registered for stage %d even though that stage was registered for the job"
.format(jobId, stageId))
} else {
def removeStage(stageId: Int) {
// data structures based on Stage
for (stage <- stageIdToStage.get(stageId)) {
if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
runningStages -= stage
}
stageToInfos -= stage
for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
shuffleToMapStage.remove(k)
}
if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
logDebug("Removing pending status for stage %d".format(stageId))
}
pendingTasks -= stage
if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
waitingStages -= stage
}
if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
failedStages -= stage
}
}
// data structures based on StageId
stageIdToStage -= stageId
stageIdToJobIds -= stageId

logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
}

jobSet -= jobId
if (jobSet.isEmpty) { // no other job needs this stage
independentStages += stageId
Expand All @@ -407,7 +418,7 @@ class DAGScheduler(

private def jobIdToStageIdsRemove(jobId: Int) {
if (!jobIdToStageIds.contains(jobId)) {
logDebug("Trying to remove unregistered job " + jobId)
logWarning("Trying to remove unregistered job " + jobId)
} else {
removeJobAndIndependentStages(jobId)
jobIdToStageIds -= jobId
Expand Down Expand Up @@ -787,6 +798,25 @@ class DAGScheduler(
}
}

/**
* detect the duplicate accumulator value and save the accumulator values
* @param accumValue the accumulator values received from the task
* @param stage the stage which the task belongs to
* @param task the completed task
*/
private def saveAccumulatorValue(accumValue: Map[Long, Any], stage: Stage, task: Task[_]) {
if (accumValue != null &&
(!stageIdToAccumulators.contains(stage.id) ||
!stageIdToAccumulators(stage.id).contains(task.partitionId))) {
val accum = stageIdToAccumulators.getOrElseUpdate(stage.id,
new HashMap[Int, ListBuffer[(Long, Any)]]).
getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)])
for ((id, value) <- accumValue) {
accum += id -> value
}
}
}

/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
Expand All @@ -813,9 +843,7 @@ class DAGScheduler(
event.reason match {
case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
saveAccumulatorValue(event.accumUpdates, stage, task)
pendingTasks(stage) -= task
task match {
case rt: ResultTask[_, _] =>
Expand Down Expand Up @@ -924,6 +952,7 @@ class DAGScheduler(
}
failedStages += failedStage
failedStages += mapStage
stageIdToAccumulators -= failedStage.id
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
Expand Down Expand Up @@ -1000,7 +1029,6 @@ class DAGScheduler(
*/
private def abortStage(failedStage: Stage, reason: String) {
if (!stageIdToStage.contains(failedStage.id)) {
// Skip all the actions if the stage has been removed.
return
}
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
Expand All @@ -1009,6 +1037,9 @@ class DAGScheduler(
val job = resultStageToJob(resultStage)
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
// remove stageIdToAccumulators(id) ensuring that the aborted stage
// accumulator is not calculated in jobIdToStageIdsRemove
stageIdToAccumulators -= resultStage.id
jobIdToStageIdsRemove(job.jobId)
jobIdToActiveJob -= resultStage.jobId
activeJobs -= job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ private[spark] class TaskSetManager(
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
logWarning(failureReason)

case ExecutorLostFailure =>
failureReason = "Executor %s lost for TID %s".format(info.executorId, tid)
logWarning(failureReason)

case _ =>
failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null))
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null))
}
}
}

private def completeWithAccumulator(accumId: Long,
taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2,
Map[Long, Any]((accumId, 1)), null, null))
}
}
}
Expand Down Expand Up @@ -305,17 +316,16 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)
val noAccum = Map[Long, Any]()
val taskSet = taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a non-failed host
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null))
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
Expand Down Expand Up @@ -407,6 +417,45 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assertDataStructuresEmpty
}

test("accumulator is not calculated for resubmitted stage") {
//just for register
val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
val shuffleOneRdd = makeRdd(2, Nil)
val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne))
val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
val finalRdd = makeRdd(1, List(shuffleDepTwo))
submit(finalRdd, Array(0))
// have the first stage complete normally
completeWithAccumulator(accum.id, taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
// have the second stage complete normally
completeWithAccumulator(accum.id, taskSets(1), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostC", 1))))
// fail the third stage because hostA went down
completeWithAccumulator(accum.id, taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
scheduler.resubmitFailedStages()
completeWithAccumulator(accum.id, taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
completeWithAccumulator(accum.id, taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
completeWithAccumulator(accum.id, taskSets(5), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(Accumulators.originals(accum.id).value === 5)
assertDataStructuresEmpty
}

test("accumulator is cleared for aborted stages") {
//just for register
new Accumulator[Int](0, SparkContext.IntAccumulatorParam)
val rdd = makeRdd(2, Nil)
submit(rdd, Array(0))
failed(taskSets(0), "tastset failed")
assertDataStructuresEmpty
}


/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down Expand Up @@ -437,5 +486,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(scheduler.runningStages.isEmpty)
assert(scheduler.shuffleToMapStage.isEmpty)
assert(scheduler.waitingStages.isEmpty)
assert(scheduler.stageIdToAccumulators.isEmpty)
}
}