-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9026] Refactor SimpleFutureAction.onComplete to not launch separate thread for every callback #7385
[SPARK-9026] Refactor SimpleFutureAction.onComplete to not launch separate thread for every callback #7385
Conversation
@zsxwing, TD and I think that this patch's fix may eliminate the need to introduce a new submitAsyncJob API. I'm going to push another commit in a few minutes to add some code comments plus more tests, but I just wanted to submit this PR now to get your initial feedback on the approach. |
private var finishedTasks = 0 | ||
|
||
// Is the job as a whole finished (succeeded or failed)? | ||
@volatile | ||
private var _jobFinished = totalTasks == 0 | ||
|
||
if (_jobFinished) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing, this if
statement fixes a subtle bug that we found in your JobWaiter future: if a job has no tasks, then it is marked as finished immediately and taskSucceeded
will not be called, so we need to complete the promise here. We noticed this because a test in AsyncRDDActionsSuite was hanging.
7b25e6b
to
d779af8
Compare
@tdas, PTAL @ the regression test that I added. |
Test build #37179 has finished for PR 7385 at commit
|
Test build #37184 has finished for PR 7385 at commit
|
Ah, interesting: it looks like this failed some of the JavaAsyncRDDActions tests. I'll investigate. |
Test build #37188 has finished for PR 7385 at commit
|
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { | ||
jobWaiter.toFuture.onComplete { (jobWaiterResult: Try[Unit]) => | ||
// If the job succeeded, then evaluate the result function; otherwise, preserve the exception. | ||
_value = jobWaiterResult.map(_ => resultFunc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdas, I think there's a bug here because we'll re-assign to _value
if there are multiple onCompletes
. There's also a race in allowing _value
to be assigned here, since there's a lag between when the jobWaiter
future completes and when this callback runs. Fixing this now...
@JoshRosen thank you for fixing this. Actually, I have another requirement for
I cannot access |
@zsxwing will |
Forgot it. It should work. Thanks. |
} | ||
}) | ||
override def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit = { | ||
jobWaiterFuture.map { _ => resultFunc }.onComplete(func) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding private lazy val _resultFunc = resultFunc
and use _resultFunc
in this class? This should avoid calling resultFunc
multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that @tdas and I considered this and ended up not doing it because we thought that resultFunc
would only be computed once, but I suppose it doesn't hurt to be more explicit here. Even if the current code works, if it's confusing enough to merit a comment then I think we should just be explicit and use a lazy val. I'll update this now to do this.
Test build #37200 has finished for PR 7385 at commit
|
@zsxwing I've updated this to use a |
Test build #37245 has finished for PR 7385 at commit
|
case JobSucceeded => scala.util.Success(resultFunc) | ||
case JobFailed(e: Exception) => scala.util.Failure(e) | ||
} else { | ||
jobWaiter.awaitResult() match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part seems like a bad hack to use awaitResult
to get the result. Rather, there should be a JobWaiter.jobResult
(make it public), that return Option[JobResult]
and use that.
I just pushed a commit which changes JobWaiter to extend |
Test build #37319 has finished for PR 7385 at commit
|
throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") | ||
} | ||
resultHandler(index, result.asInstanceOf[T]) | ||
finishedTasks += 1 | ||
if (finishedTasks == totalTasks) { | ||
_jobFinished = true | ||
jobResult = JobSucceeded | ||
promise.trySuccess() | ||
this.notifyAll() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line can be removed. Right?
Test build #37677 has finished for PR 7385 at commit
|
Jenkins, retest this please. |
Test build #37706 has finished for PR 7385 at commit
|
Jenkins, retest this please. |
Test build #37745 has finished for PR 7385 at commit
|
// There are certain situations where jobFailed can be called multiple times for the same | ||
// job. We guard against this by making this method idempotent. | ||
if (!isCompleted) { | ||
promise.failure(exception) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks tryFailure
would be simpler.
retest this please |
@tdas could you take a look at this PR again? I think it's better to merge this one at first. |
Test build #61 has finished for PR 7385 at commit
|
Test build #38083 has finished for PR 7385 at commit
|
Test build #1162 has finished for PR 7385 at commit
|
Jenkins, retest this please. |
Test build #38119 has finished for PR 7385 at commit
|
Test build #38258 has finished for PR 7385 at commit
|
Test build #38280 has finished for PR 7385 at commit
|
Test build #1191 has finished for PR 7385 at commit
|
Test build #1192 has finished for PR 7385 at commit
|
Jenkins, retest this please. |
Test build #38573 has finished for PR 7385 at commit
|
Test build #2093 has finished for PR 7385 at commit
|
@JoshRosen could you close this one, since #9264 fixes the issue as well? Thanks! |
@zsxwing @JoshRosen Does the comment need attention since the pr is closed, https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L438? |
That's a good find. cc @zsxwing |
Thanks @jaceklaskowski I will submit a PR to remove |
This commit refactors SimpleFutureAction so that registering a callback with
onComplete
does not immediately tie up a thread in the provided execution context.As @zsxwing noticed in #7276 (comment), the existing implementation of SimpleFutureAction.onComplete creates a separate thread to wrap the blocking
awaitResult()
callback:This PR addresses this issue by making JobWaiter into a Future using that future in the implementation of SimpleFutureAction's future methods.
This patch was pair-programmed with @tdas.