-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-3902] [SPARK-3590] Stabilize AsynRDDActions and add Java API #2760
Conversation
/cc's for review:
|
import java.util.List; | ||
import java.util.concurrent.Future; | ||
|
||
public interface JavaFutureAction<T> extends Future<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it makes sense to expose an extended version of the Java Future
API to users, since there may be a number of existing libraries for consuming these standard future types.
QA tests have started for PR 2760 at commit
|
*/ | ||
@Experimental | ||
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, my PR breaks compatibility for this experimental Java API. However, the previous version of this method hasn't been shipped in any Spark releases yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea i think this is fine
QA tests have started for PR 2760 at commit
|
Looks great! I think it's very useful to have these async APIs in java :-) |
QA tests have finished for PR 2760 at commit
|
Test PASSed. |
QA tests have finished for PR 2760 at commit
|
Test PASSed. |
@@ -17,6 +17,7 @@ | |||
|
|||
package org.apache.spark.api.java | |||
|
|||
import java.util |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an extra import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch; fixed.
@rxin I removed that extra import. Any other comments or things you'd like to discuss? |
QA tests have started for PR 2760 at commit
|
} | ||
|
||
override def jobIds(): java.util.List[java.lang.Integer] = { | ||
new java.util.ArrayList(futureAction.jobIds.map(x => new Integer(x)).asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... kinda sucks you have to make the conversion manually.
I'd use Collections.unmodifiableList()
and Integer.valueOf()
here, though.
QA tests have finished for PR 2760 at commit
|
Test PASSed. |
case scala.util.Success(value) => converter(value) | ||
case Failure(exception) => | ||
if (isCancelled) { | ||
throw new CancellationException("Job cancelled: ${exception.message}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing s""
? I'd also use the exception as the cause of the CancellationException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch; I'll fix the s""
. Unfortunately, CancellationException
doesn't have a constructor that accepts a cause exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, but I can do this:
val ce = new CancellationException("Job cancelled")
ce.initCause(exception)
throw ce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, even more succinctly:
throw new CancellationException("Job cancelled").initCause(exception)
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); | ||
JavaRDD<Integer> rdd = sc.parallelize(data, 1); | ||
JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync(); | ||
Thread.sleep(200); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this is necessary. The get()
call below should cover this already.
HI @JoshRosen, this looks mostly good. I have some comments on the tests because I'm generally wary of tests that rely too heavily on timeouts. |
Hi @vanzin, Thanks for the review feedback. I've significantly simplified the tests and incorporated your comments. |
Test FAILed. |
Jenkins, test this please. |
QA tests have started for PR 2760 at commit
|
JavaRDD<Integer> rdd = sc.parallelize(data, 1); | ||
JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync(); | ||
try { | ||
long count = future.get(2, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: variable is unused.
Latest changes LGTM, thanks! |
QA tests have finished for PR 2760 at commit
|
Test PASSed. |
QA tests have started for PR 2760 at commit
|
QA tests have finished for PR 2760 at commit
|
Test PASSed. |
…n-java Conflicts: project/MimaExcludes.scala
QA tests have started for PR 2760 at commit
|
@rxin Did you have any other feedback here? If not, I'd like to merge this. |
QA tests have finished for PR 2760 at commit
|
Test PASSed. |
LGTM - we discussed some details of this offline last week. |
Alright, I'm going to merge this. Thanks! |
Reviewed again after the fact. LGTM. |
This PR adds a Java API for AsyncRDDActions and promotes the API from
@Experimental
to stable.