Skip to content

Commit

Permalink
[SPARK-2323] Exception in accumulator update should not crash DAGSche…
Browse files Browse the repository at this point in the history
…duler & SparkContext

Author: Reynold Xin <rxin@apache.org>

Closes #1772 from rxin/accumulator-dagscheduler and squashes the following commits:

6a58520 [Reynold Xin] [SPARK-2323] Exception in accumulator update should not crash DAGScheduler & SparkContext.
  • Loading branch information
rxin committed Aug 5, 2014
1 parent 9fd82db commit 05bf4e4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -904,8 +904,13 @@ class DAGScheduler(
event.reason match {
case Success =>
if (event.accumUpdates != null) {
// TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
try {
Accumulators.add(event.accumUpdates)
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
}
stage.pendingTasks -= task
task match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}

// TODO: Fix this and un-ignore the test.
ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
override def zero(initialValue: Int): Int = 0
Expand All @@ -633,14 +632,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
})

// Run this on executors
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
}
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }

// Run this within a local thread
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
}
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)

// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
Expand Down

0 comments on commit 05bf4e4

Please sign in to comment.