-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9026] [SPARK-4514] Modifications to JobWaiter, FutureAction, and AsyncRDDActions to support non-blocking operation #9264
Changes from 6 commits
4210aa6
1ad1abd
de008ce
17f6988
4def989
93b3065
dcd2883
8624ef0
31de51b
0a6c614
2c2da36
cef7637
e364f93
0dd1128
50cd13e
7f4244d
dddad7f
19ef962
d1a905d
c63ac17
489aabc
c19b3c0
601bb95
9da3533
a8ba899
38b1442
5816489
8fe8000
539ac43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit | |
|
||
import org.apache.spark.api.java.JavaFutureAction | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter} | ||
import org.apache.spark.scheduler.JobWaiter | ||
|
||
import scala.concurrent._ | ||
import scala.concurrent.duration.Duration | ||
import scala.util.{Failure, Try} | ||
import scala.util.Try | ||
|
||
/** | ||
* A future for the result of an action to support cancellation. This is an extension of the | ||
|
@@ -116,57 +116,26 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: | |
} | ||
|
||
override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { | ||
if (!atMost.isFinite()) { | ||
awaitResult() | ||
} else jobWaiter.synchronized { | ||
val finishTime = System.currentTimeMillis() + atMost.toMillis | ||
while (!isCompleted) { | ||
val time = System.currentTimeMillis() | ||
if (time >= finishTime) { | ||
throw new TimeoutException | ||
} else { | ||
jobWaiter.wait(finishTime - time) | ||
} | ||
} | ||
} | ||
jobWaiter.completionFuture.ready(atMost) | ||
this | ||
} | ||
|
||
@throws(classOf[Exception]) | ||
override def result(atMost: Duration)(implicit permit: CanAwait): T = { | ||
ready(atMost)(permit) | ||
awaitResult() match { | ||
case scala.util.Success(res) => res | ||
case scala.util.Failure(e) => throw e | ||
} | ||
jobWaiter.completionFuture.ready(atMost) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. |
||
value.get.get | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just to be safe, can you add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That seems sort of redundant, given that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very well. I'll add an assertion. |
||
} | ||
|
||
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext) { | ||
executor.execute(new Runnable { | ||
override def run() { | ||
func(awaitResult()) | ||
} | ||
}) | ||
jobWaiter.completionFuture onComplete {_ => func(value.get)} | ||
} | ||
|
||
override def isCompleted: Boolean = jobWaiter.jobFinished | ||
|
||
override def isCancelled: Boolean = _cancelled | ||
|
||
override def value: Option[Try[T]] = { | ||
if (jobWaiter.jobFinished) { | ||
Some(awaitResult()) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
private def awaitResult(): Try[T] = { | ||
jobWaiter.awaitResult() match { | ||
case JobSucceeded => scala.util.Success(resultFunc) | ||
case JobFailed(e: Exception) => scala.util.Failure(e) | ||
} | ||
} | ||
override def value: Option[Try[T]] = | ||
jobWaiter.completionFuture.value map {res => res map {_ => resultFunc}} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
|
||
def jobIds: Seq[Int] = Seq(jobWaiter.jobId) | ||
} | ||
|
@@ -179,44 +148,25 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: | |
*/ | ||
class ComplexFutureAction[T] extends FutureAction[T] { | ||
|
||
// Pointer to the thread that is executing the action. It is set when the action is run. | ||
@volatile private var thread: Thread = _ | ||
|
||
// A flag indicating whether the future has been cancelled. This is used in case the future | ||
// is cancelled before the action was even run (and thus we have no thread to interrupt). | ||
@volatile private var _cancelled: Boolean = false | ||
@volatile private var _cancelled = false | ||
|
||
@volatile private var jobs: Seq[Int] = Nil | ||
@volatile private var subActions: List[FutureAction[_]] = Nil | ||
|
||
// A promise used to signal the future. | ||
private val p = promise[T]() | ||
private val p = Promise[T]() | ||
|
||
override def cancel(): Unit = this.synchronized { | ||
override def cancel(): Unit = synchronized { | ||
_cancelled = true | ||
if (thread != null) { | ||
thread.interrupt() | ||
} | ||
p.tryFailure(new SparkException("Action has been cancelled")) | ||
subActions foreach {_.cancel()} | ||
} | ||
|
||
/** | ||
* Executes some action enclosed in the closure. To properly enable cancellation, the closure | ||
* should use runJob implementation in this promise. See takeAsync for example. | ||
*/ | ||
def run(func: => T)(implicit executor: ExecutionContext): this.type = { | ||
scala.concurrent.future { | ||
thread = Thread.currentThread | ||
try { | ||
p.success(func) | ||
} catch { | ||
case e: Exception => p.failure(e) | ||
} finally { | ||
// This lock guarantees when calling `thread.interrupt()` in `cancel`, | ||
// thread won't be set to null. | ||
ComplexFutureAction.this.synchronized { | ||
thread = null | ||
} | ||
} | ||
} | ||
def run(func: => Future[T])(implicit executor: ExecutionContext): this.type = { | ||
p tryCompleteWith func | ||
this | ||
} | ||
|
||
|
@@ -229,28 +179,15 @@ class ComplexFutureAction[T] extends FutureAction[T] { | |
processPartition: Iterator[T] => U, | ||
partitions: Seq[Int], | ||
resultHandler: (Int, U) => Unit, | ||
resultFunc: => R) { | ||
resultFunc: => R)(implicit executor: ExecutionContext) : FutureAction[R] = synchronized { | ||
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob | ||
// command need to be in an atomic block. | ||
val job = this.synchronized { | ||
if (!isCancelled) { | ||
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) | ||
} else { | ||
throw new SparkException("Action has been cancelled") | ||
} | ||
} | ||
|
||
this.jobs = jobs ++ job.jobIds | ||
|
||
// Wait for the job to complete. If the action is cancelled (with an interrupt), | ||
// cancel the job and stop the execution. This is not in a synchronized block because | ||
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter. | ||
try { | ||
Await.ready(job, Duration.Inf) | ||
} catch { | ||
case e: InterruptedException => | ||
job.cancel() | ||
throw new SparkException("Action has been cancelled") | ||
if (!isCancelled) { | ||
val job = rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc) | ||
subActions = job::subActions | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Space around There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think I've ever seen it done that way, and ScalaStyle doesn't seem to care. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm really? all of the Spark code I see is written this way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually haven't seen Spark code that uses There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really? I can't find any that don't use spaces, after skimming over the first 1000 hits. I can't recall seeing Scala code without spaces around this operator. shrug I'd match the surrounding project code here in matters of style/taste. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I searched for "ml cons operator" in Google, and out of all the hits on the first page that included (ML, SML, or F#) code samples, all but one omitted the spaces. Anyway, I've inserted spaces for the sake of consistency. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're talking about a Scala operator in Scala code here though so I don't know that these other examples mean much. See the Scala doc. http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.List |
||
job | ||
} else { | ||
throw new SparkException("Action has been cancelled") | ||
} | ||
} | ||
|
||
|
@@ -276,7 +213,7 @@ class ComplexFutureAction[T] extends FutureAction[T] { | |
|
||
override def value: Option[Try[T]] = p.future.value | ||
|
||
def jobIds: Seq[Int] = jobs | ||
def jobIds: Seq[Int] = subActions flatMap {_.jobIds} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
} | ||
|
||
|
@@ -303,7 +240,7 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S | |
Await.ready(futureAction, timeout) | ||
futureAction.value.get match { | ||
case scala.util.Success(value) => converter(value) | ||
case Failure(exception) => | ||
case scala.util.Failure(exception) => | ||
if (isCancelled) { | ||
throw new CancellationException("Job cancelled").initCause(exception) | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,10 +22,10 @@ import java.util.concurrent.atomic.AtomicLong | |
import org.apache.spark.util.ThreadUtils | ||
|
||
import scala.collection.mutable.ArrayBuffer | ||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.{Future, ExecutionContext} | ||
import scala.reflect.ClassTag | ||
|
||
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} | ||
import org.apache.spark.{SimpleFutureAction, ComplexFutureAction, FutureAction, Logging} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: ordering inside {} |
||
|
||
/** | ||
* A set of asynchronous RDD actions available through an implicit conversion. | ||
|
@@ -66,14 +66,22 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi | |
*/ | ||
def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { | ||
val f = new ComplexFutureAction[Seq[T]] | ||
|
||
f.run { | ||
// This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which | ||
// is a cached thread pool. | ||
val results = new ArrayBuffer[T](num) | ||
val totalParts = self.partitions.length | ||
var partsScanned = 0 | ||
while (results.size < num && partsScanned < totalParts) { | ||
// Cached thread pool to handle aggregation of subtasks. | ||
implicit val executionContext = AsyncRDDActions.futureExecutionContext | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessary for this patch. But I think we can remove |
||
val results = new ArrayBuffer[T](num) | ||
val totalParts = self.partitions.length | ||
|
||
/* | ||
Recursively triggers jobs to scan partitions until either the requested | ||
number of elements are retrieved, or the partitions to scan are exhausted. | ||
This implementation is non-blocking, asynchronously handling the | ||
results of each job and triggering the next job using callbacks on futures. | ||
*/ | ||
def continue(partsScanned : Int) : Future[Seq[T]] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lots of style violations:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Such as? Most of the body of this function was existing code that I just moved into here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please review the link I just posted |
||
if (results.size >= num || partsScanned >= totalParts) { | ||
Future.successful(results.toSeq) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
else { | ||
// The number of partitions to try in this iteration. It is ok for this number to be | ||
// greater than totalParts because we actually cap it at totalParts in runJob. | ||
var numPartsToTry = 1 | ||
|
@@ -95,19 +103,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi | |
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) | ||
|
||
val buf = new Array[Array[T]](p.size) | ||
f.runJob(self, | ||
val job = f.runJob(self, | ||
(it: Iterator[T]) => it.take(left).toArray, | ||
p, | ||
(index: Int, data: Array[T]) => buf(index) = data, | ||
Unit) | ||
|
||
buf.foreach(results ++= _.take(num - results.size)) | ||
partsScanned += numPartsToTry | ||
job flatMap {case _ => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
buf.foreach(results ++= _.take(num - results.size)) | ||
continue(partsScanned + numPartsToTry) | ||
} | ||
} | ||
results.toSeq | ||
}(AsyncRDDActions.futureExecutionContext) | ||
|
||
f | ||
f.run {continue(0)} | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,10 @@ import java.util.Properties | |
import java.util.concurrent.TimeUnit | ||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
import org.apache.spark.Success | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: ordering |
||
|
||
import scala.collection.Map | ||
import scala.concurrent.Await | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you order these imports? here and other places There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
import scala.collection.mutable.{HashMap, HashSet, Stack} | ||
import scala.concurrent.duration._ | ||
import scala.language.existentials | ||
|
@@ -609,11 +612,12 @@ class DAGScheduler( | |
properties: Properties): Unit = { | ||
val start = System.nanoTime | ||
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) | ||
waiter.awaitResult() match { | ||
case JobSucceeded => | ||
Await.ready(waiter.completionFuture, atMost = Duration.Inf) | ||
waiter.completionFuture.value.get match { | ||
case scala.util.Success(_) => | ||
logInfo("Job %d finished: %s, took %f s".format | ||
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) | ||
case JobFailed(exception: Exception) => | ||
case scala.util.Failure(exception) => | ||
logInfo("Job %d failed: %s, took %f s".format | ||
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) | ||
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,10 @@ | |
|
||
package org.apache.spark.scheduler | ||
|
||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
import scala.concurrent.{Future, Promise} | ||
|
||
/** | ||
* An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their | ||
* results to the given handler function. | ||
|
@@ -28,17 +32,15 @@ private[spark] class JobWaiter[T]( | |
resultHandler: (Int, T) => Unit) | ||
extends JobListener { | ||
|
||
private var finishedTasks = 0 | ||
|
||
// Is the job as a whole finished (succeeded or failed)? | ||
@volatile | ||
private var _jobFinished = totalTasks == 0 | ||
|
||
def jobFinished: Boolean = _jobFinished | ||
|
||
private val finishedTasks = new AtomicInteger(0) | ||
// If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero | ||
// partition RDDs), we set the jobResult directly to JobSucceeded. | ||
private var jobResult: JobResult = if (jobFinished) JobSucceeded else null | ||
private val jobPromise : Promise[Unit] = | ||
if (totalTasks == 0) Promise.successful(()) else Promise() | ||
|
||
def jobFinished: Boolean = jobPromise.isCompleted | ||
|
||
def completionFuture : Future[Unit] = jobPromise.future | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no space before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
|
||
/** | ||
* Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled | ||
|
@@ -49,29 +51,14 @@ private[spark] class JobWaiter[T]( | |
dagScheduler.cancelJob(jobId) | ||
} | ||
|
||
override def taskSucceeded(index: Int, result: Any): Unit = synchronized { | ||
if (_jobFinished) { | ||
throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") | ||
} | ||
override def taskSucceeded(index: Int, result: Any): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. note that this does change behavior because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... I see what you mean. While nothing within There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder... do we need to also put some sort of synchronization around |
||
resultHandler(index, result.asInstanceOf[T]) | ||
finishedTasks += 1 | ||
if (finishedTasks == totalTasks) { | ||
_jobFinished = true | ||
jobResult = JobSucceeded | ||
this.notifyAll() | ||
if (finishedTasks.incrementAndGet() == totalTasks) { | ||
jobPromise.success(()) | ||
} | ||
} | ||
|
||
override def jobFailed(exception: Exception): Unit = synchronized { | ||
_jobFinished = true | ||
jobResult = JobFailed(exception) | ||
this.notifyAll() | ||
} | ||
override def jobFailed(exception: Exception): Unit = | ||
jobPromise.failure(exception) | ||
|
||
def awaitResult(): JobResult = synchronized { | ||
while (!_jobFinished) { | ||
this.wait() | ||
} | ||
return jobResult | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ package org.apache.spark.rdd | |
|
||
import java.util.concurrent.Semaphore | ||
|
||
import scala.concurrent.{Await, TimeoutException} | ||
import scala.concurrent._ | ||
import scala.concurrent.duration.Duration | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
|
||
|
@@ -197,4 +197,30 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim | |
Await.result(f, Duration(20, "milliseconds")) | ||
} | ||
} | ||
|
||
test("SimpleFutureAction callback must not consume a thread while waiting") { | ||
val executorInvoked = Promise[Unit] | ||
val fakeExecutionContext = new ExecutionContext { | ||
override def execute(runnable: Runnable): Unit = { | ||
executorInvoked.success(()) | ||
} | ||
override def reportFailure(t: Throwable): Unit = ??? | ||
} | ||
val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).countAsync() | ||
f.onComplete(_ => ())(fakeExecutionContext) | ||
assert(!executorInvoked.isCompleted) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if I understand right, the idea of this is test is you set up some really slow transformations, so that it won't actually complete before you get to this assert? at the very least lets add a comment saying that. I worry that this will become a flaky test ... though I can't immediately think of an alternative. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried two other approaches (one using semaphores, one using actors) to come up with a "non-flaky" solution, but neither worked because the tasks get serialized and deserialized, even when running in local mode. It seems that "Thread.sleep" is the only viable approach, as ugly as it is. The async action should just go away after the job completes. I'll add a comment. |
||
} | ||
|
||
test("ComplexFutureAction callback must not consume a thread while waiting") { | ||
val executorInvoked = Promise[Unit] | ||
val fakeExecutionContext = new ExecutionContext { | ||
override def execute(runnable: Runnable): Unit = { | ||
executorInvoked.success(()) | ||
} | ||
override def reportFailure(t: Throwable): Unit = ??? | ||
} | ||
val f = sc.parallelize(1 to 100, 4).mapPartitions(itr => {Thread.sleep(1000L); itr}).takeAsync(100) | ||
f.onComplete(_ => ())(fakeExecutionContext) | ||
assert(!executorInvoked.isCompleted) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I know you didn't introduce this, but as long as you're touching it -- the scala imports should go above the spark imports (still in their own group)