Skip to content

Commit

Permalink
[SPARK-13408] [CORE] Ignore errors when it's already reported in JobW…
Browse files Browse the repository at this point in the history
…aiter

## What changes were proposed in this pull request?

`JobWaiter.taskSucceeded` will be called for each task. When `resultHandler` throws an exception, `taskSucceeded` will also throw it for each task. DAGScheduler just catches it and reports it like this:
```Scala
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)
                  } catch {
                    case e: Exception =>
                      // TODO: Perhaps we want to mark the resultStage as failed?
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
                  }
```
Therefore `JobWaiter.jobFailed` may be called multiple times.

So `JobWaiter.jobFailed` should use `Promise.tryFailure` instead of `Promise.failure` because the latter one doesn't support calling multiple times.

## How was the this patch tested?

Jenkins tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #11280 from zsxwing/SPARK-13408.
  • Loading branch information
zsxwing authored and davies committed Feb 20, 2016
1 parent 6624a58 commit dfb2ae2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.{Future, Promise}

import org.apache.spark.Logging

/**
* An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
* results to the given handler function.
Expand All @@ -30,7 +32,7 @@ private[spark] class JobWaiter[T](
val jobId: Int,
totalTasks: Int,
resultHandler: (Int, T) => Unit)
extends JobListener {
extends JobListener with Logging {

private val finishedTasks = new AtomicInteger(0)
// If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero
Expand Down Expand Up @@ -61,7 +63,10 @@ private[spark] class JobWaiter[T](
}
}

override def jobFailed(exception: Exception): Unit =
jobPromise.failure(exception)
override def jobFailed(exception: Exception): Unit = {
if (!jobPromise.tryFailure(exception)) {
logWarning("Ignore failure", exception)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import scala.util.Failure

import org.apache.spark.SparkFunSuite

class JobWaiterSuite extends SparkFunSuite {

test("call jobFailed multiple times") {
val waiter = new JobWaiter[Int](null, 0, totalTasks = 2, null)

// Should not throw exception if calling jobFailed multiple times
waiter.jobFailed(new RuntimeException("Oops 1"))
waiter.jobFailed(new RuntimeException("Oops 2"))
waiter.jobFailed(new RuntimeException("Oops 3"))

waiter.completionFuture.value match {
case Some(Failure(e)) =>
// We should receive the first exception
assert("Oops 1" === e.getMessage)
case other => fail("Should receiver the first exception but it was " + other)
}
}
}

0 comments on commit dfb2ae2

Please sign in to comment.