From 1deed38a9cf7d1b4c34074db1affa32d693bbd61 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 20:08:09 -0700 Subject: [PATCH] Add some comments --- core/src/main/scala/org/apache/spark/FutureAction.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 307901516db4e..0fba6e8f1c7fd 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -108,7 +108,11 @@ trait FutureAction[T] extends Future[T] { class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) extends FutureAction[T] { + // Note: `resultFunc` is a closure which may contain references to state that's updated by the + // JobWaiter's result handler function. It should only be evaluated once the job has succeeded. + @volatile private var _cancelled: Boolean = false + // Null until the job has completed, then holds a Try representing success or failure. @volatile private var _value: Try[T] = null override def cancel() { @@ -117,18 +121,22 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: } override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { + // This call to the JobWaiter's future will throw an exception if the job failed. jobWaiter.toFuture.ready(atMost)(permit) this } @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { + // This call to the JobWaiter's future will throw an exception if the job failed. jobWaiter.toFuture.result(atMost)(permit) + // At this point, we know that the job succeeded so it's safe to evaluate this function: resultFunc } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { jobWaiter.toFuture.onComplete { (jobWaiterResult: Try[Unit]) => + // If the job succeeded, then evaluate the result function; otherwise, preserve the exception. _value = jobWaiterResult.map(_ => resultFunc) func(_value) }