Skip to content

Commit

Permalink
Propagate an exception in case of failure.
Browse files Browse the repository at this point in the history
  • Loading branch information
dragos committed Jan 26, 2016
1 parent 997721c commit 639ef36
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ private[spark] class CoarseMesosSchedulerBackend(

override def error(d: SchedulerDriver, message: String) {
logError(s"Mesos error: $message")
markErr()
scheduler.error(message)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
registerLatch.await()
return
}
@volatile
var error: Option[Exception] = None

// We create a new thread that will block inside `mesosDriver.run`
// until the scheduler exists
new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
setDaemon(true)
override def run() {
Expand All @@ -115,17 +119,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
val ret = mesosDriver.run()
logInfo("driver.run() returned with code " + ret)
if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
logError("Error starting driver, DRIVER_ABORTED")
error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
markErr()
}
} catch {
case e: Exception => {
logError("driver.run() failed", e)
error = Some(e)
markErr()
}
}
}
}.start()

registerLatch.await()

// propagate any error to the calling thread. This ensures that SparkContext creation fails
// without leaving a broken context that won't be able to schedule any tasks
error.foreach(throw _)
}
}

Expand Down

0 comments on commit 639ef36

Please sign in to comment.