-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9026] [SPARK-4514] Modifications to JobWaiter, FutureAction, and AsyncRDDActions to support non-blocking operation #9264
Conversation
…reads while waiting for jobs
Test build #1952 has finished for PR 9264 at commit
|
This seems like it overlaps with #7385, an old PR of mine which was put on hold when I became too busy with other tickets (and when the original motivation for it was worked around via another means). This would still be good to fix, though, and I'm happy to work with you to try to do this. |
@JoshRosen Agreed that this is largely a duplicate. One of SPARK-9026 and SPARK-11296 should be marked as a dup and closed. And fwiw, at least the test failures on the jobFailed race that you were dealing with in that PR have since been dealt with in a separate PR, recently merged. |
I agree that the JIRA issue is a duplicate. Odd that the old one didn't turn up for it when I searched for existing issues. I've linked the issues in JIRA. |
It looks like our approaches are very similar, though my changes to |
I've added some unit tests that verify that my changes, do, indeed, stop callbacks from consuming threads before the jobs have completed. |
What else needs to be done to get this PR merged? |
|
||
import scala.concurrent._ | ||
import scala.concurrent.duration.Duration | ||
import scala.util.{Failure, Try} | ||
import scala.util.Try |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I know you didn't introduce this, but as long as you're touching it -- the scala imports should go above the spark imports (still in their own group)
Hi @reggert , I'm not the best person to review this, so I can't definitely say what needs to be done to get it merged. I pointed out a few minor style issues just to keep things moving along. You also need to resolve conflicts with master. |
Jenkins, ok to test |
Test build #45395 has finished for PR 9264 at commit
|
/** | ||
* Utility functions used by the query planner to convert our plan to new aggregation code path. | ||
*/ | ||
object Utils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Git somehow managed to screw up the merge when I re-sync'd with the upstream master. I'll resurrect it.
@reggert I've closed your issue as a duplicate. Can you remove it from the title and use only |
@squito I merged with master 2 days ago, and more conflicts were introduced on master less than 24 hours later. I'll merge one more time in order to fix the issue with the accidentally-deleted Utils class, but I won't do it again after that until it's the only thing left to do. |
Test build #45491 has finished for PR 9264 at commit
|
…ests for non-blocking; cleaned up a few minor style issues in the process; added comment explaining why we're using Thread.sleep
Back up to date with master. All style issues should be resolved. |
SparkQA seems to be happy now. |
It appears that this PR may also fix SPARK-4514. |
…rackerSuite This reverts commit 38b1442.
Correction: This PR only fixes SPARK-4514 as long as the |
… within takeAsync
I've implemented a two-line fix for SPARK-4514. Note that trying to fix it outside of this PR would have resulted in a merge conflict. |
Test build #46498 has finished for PR 9264 at commit
|
Test build #46499 has finished for PR 9264 at commit
|
@zsxwing, could you take a look at this patch? I think you'd be a good reviewer for this given your experience with similar patches in streaming. |
Any feedback? |
@zsxwing @JoshRosen I just want to make sure that you guys haven't forgotten about this. I haven't heard anything in a week and a half. |
All I want for Christmas is a code review. :-) |
// Now allow the executors to proceed with task processing. | ||
starter.release(rdd.partitions.length) | ||
// Waiting for the result verifies that the tasks were successfully processed. | ||
// This mainly exists to verify that we didn't break task deserialization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the callback doesn't run, how to verify that we didn't break task deserialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I'm mistaken, the Await.result
call will throw an exception if the job fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, in the event that we've broken task deserialization, I would expect to see a NotSerializableException
or similar error thrown from the Await.result
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tested the logic locally. Task not serializable
will be throw from mapPartitions
if I added some non-serializable reference to the closure.
Since no place will call executionContextInvoked.failure
, the only exception will be thrown from Await.result(executionContextInvoked.future, atMost = 15.seconds)
is just TimeoutException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you test it? My expectation is the same as Richard's, that Await.result
will throw the underlying exception. For example:
scala> val fi: Future[Int] = Future { throw new java.lang.RuntimeException() }
fi: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@3f200884
scala> try { Await.result(fi, 3 seconds) } catch {
| case _: java.util.concurrent.TimeoutException => println("Timeout")
| case _: java.lang.RuntimeException => println("Runtime")
| case _: Throwable => println("Huh?")
| }
Runtime
res1: AnyVal = ()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is here it uses executionContextInvoked.future
to call Await.result
. But the only place that completes the Promise is executionContextInvoked.success(())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, got it -- yes, that does appear to be swallowing any exception from mapPartitions
. Good eyes!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If mapPartitions
throws an exception, won't that cause the test to fail before we even get to Await.result
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If mapPartitions throws an exception, won't that cause the test to fail before we even get to Await.result?
I'm not against Await.result
here. I just wanted to point out the comment is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, is that all? :-)
Comment line removed.
@reggert sorry for the delay. LGTM except a nit for test. Thanks very much. |
Test build #47629 has finished for PR 9264 at commit
|
LGTM |
Thanks, merging into master. |
By the way, @reggert how is this related to SPARK-4514? |
@andrewor14 I included a couple lines to propagate the local properties (from the thread that created the |
…oesn't create a separate thread to wait for the job result Before apache#9264, submitJob would create a separate thread to wait for the job result. `submitJobThreadPool` was a workaround in `ReceiverTracker` to run these waiting-job-result threads. Now apache#9264 has been merged to master and resolved this blocking issue, `submitJobThreadPool` can be removed now. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#10560 from zsxwing/remove-submitJobThreadPool.
## What changes were proposed in this pull request? Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA. ## How was this patch tested? New test in `SQLExecutionSuite`. Author: Andrew Or <andrew@databricks.com> Closes #11586 from andrewor14/fix-concurrent-sql.
## What changes were proposed in this pull request? Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to apache#9264: ``` (1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() } ``` This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA. ## How was this patch tested? New test in `SQLExecutionSuite`. Author: Andrew Or <andrew@databricks.com> Closes apache#11586 from andrewor14/fix-concurrent-sql.
These changes rework the implementations of
SimpleFutureAction
,ComplexFutureAction
,JobWaiter
, andAsyncRDDActions
such that asynchronous callbacks on the generatedFutures
NEVER block waiting for a job to complete. A small amount of mutex synchronization is necessary to protect the internal fields that manage cancellation, but these locks are only held very briefly and in practice should almost never cause any blocking to occur. The existing blocking APIs of these classes are retained, but they simply delegate to the underlying non-blocking API andAwait
the results with indefinite timeouts.Associated JIRA ticket: https://issues.apache.org/jira/browse/SPARK-9026
Also fixes: https://issues.apache.org/jira/browse/SPARK-4514
This pull request contains all my own original work, which I release to the Spark project under its open source license.