From 1346313c38d8009d849399b654913b95b3646ff5 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 14 Jul 2015 10:54:41 -0700 Subject: [PATCH] Use lazy val to make ot clear that resultfunc should only be evaluated once --- .../src/main/scala/org/apache/spark/FutureAction.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 5c27d078d3ac9..7a3d29f25d522 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -113,6 +113,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: @volatile private var _cancelled: Boolean = false private[this] val jobWaiterFuture: Future[Unit] = jobWaiter.toFuture + private[this] lazy val resultFuncOutput: T = { + assert(isCompleted, "resultFunc should only be evaluated after the job has completed") + resultFunc + } override def cancel() { _cancelled = true @@ -127,11 +131,11 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: @throws(classOf[Exception]) override def result(atMost: Duration)(implicit permit: CanAwait): T = { jobWaiterFuture.result(atMost)(permit) // Throws exception if the job failed. - resultFunc // This function is safe to evaluate because the job must have succeeded. + resultFuncOutput // This function is safe to evaluate because the job must have succeeded. } override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { - jobWaiterFuture.map { _ => resultFunc }.onComplete(func) + jobWaiterFuture.map { _ => resultFuncOutput }.onComplete(func) } override def isCompleted: Boolean = jobWaiter.jobFinished @@ -143,7 +147,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: None } else { jobWaiter.awaitResult() match { - case JobSucceeded => Some(scala.util.Success(resultFunc)) + case JobSucceeded => Some(scala.util.Success(resultFuncOutput)) case JobFailed(e) => Some(scala.util.Failure(e)) } }