Skip to content

Commit

Permalink
[SPARK-12265][MESOS] Spark calls System.exit inside driver instead of…
Browse files Browse the repository at this point in the history
… throwing exception

This takes over #10729 and makes sure that `spark-shell` fails with a proper error message. There is a slight behavioral change: before this change `spark-shell` would exit, while now the REPL is still there, but `sc` and `sqlContext` are not defined and the error is visible to the user.

Author: Nilanjan Raychaudhuri <nraychaudhuri@gmail.com>
Author: Iulian Dragos <jaguarul@gmail.com>

Closes #10921 from dragos/pr/10729.
  • Loading branch information
nraychaudhuri authored and Andrew Or committed Feb 1, 2016
1 parent 51b03b7 commit a41b68b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ private[spark] class MesosClusterScheduler(
override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
override def error(driver: SchedulerDriver, error: String): Unit = {
logError("Error received: " + error)
markErr()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ private[spark] class MesosSchedulerBackend(
override def error(d: SchedulerDriver, message: String) {
inClassLoader() {
logError("Mesos error: " + message)
markErr()
scheduler.error(message)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,28 +106,37 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
registerLatch.await()
return
}
@volatile
var error: Option[Exception] = None

// We create a new thread that will block inside `mesosDriver.run`
// until the scheduler exists
new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
setDaemon(true)

override def run() {
mesosDriver = newDriver
try {
mesosDriver = newDriver
val ret = mesosDriver.run()
logInfo("driver.run() returned with code " + ret)
if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
System.exit(1)
error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
markErr()
}
} catch {
case e: Exception => {
logError("driver.run() failed", e)
System.exit(1)
error = Some(e)
markErr()
}
}
}
}.start()

registerLatch.await()

// propagate any error to the calling thread. This ensures that SparkContext creation fails
// without leaving a broken context that won't be able to schedule any tasks
error.foreach(throw _)
}
}

Expand All @@ -144,6 +153,10 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
registerLatch.countDown()
}

protected def markErr(): Unit = {
registerLatch.countDown()
}

def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
val builder = Resource.newBuilder()
.setName(name)
Expand Down

0 comments on commit a41b68b

Please sign in to comment.