Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24947] [Core] aggregateAsync and foldAsync #21971

Closed
wants to merge 1 commit into from

Conversation

ceedubs
Copy link
Contributor

@ceedubs ceedubs commented Aug 2, 2018

See the description in the Jira ticket.

This contribution is my original work (inspired by similar methods in
the Spark code) and I license this work to Spark under the Apache
License 2.0.

What changes were proposed in this pull request?

Add aggregateAsync and foldAsync methods to AsyncRDDActions.

How was this patch tested?

Unit tests (included in PR).

See the description in the [Jira ticket](https://issues.apache.org/jira/browse/SPARK-24947).

This contribution is my original work (inspired by similar methods in
the Spark code) and I license this work to Spark under the Apache
License 2.0.
* @see [[RDD.aggregate]] which is the synchronous version of this method.
*/
def aggregateAsync[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): FutureAction[U] =
self.withScope {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the synchronous version of aggregate, the zeroValue is cloned, which requires adding an implicit ClassTag[U] argument. I didn't really understand the motivation for that, so I didn't do it here, but I was hoping that someone who understood the cloning could let me know here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO one reason could be the rule of fail fast and early. As the cloning uses serialisation/deserialisation which anyway needed for sending the neutral element to the executors this way serialisation of the neutral element is tested when the operator is specified not during the lazy execution in the middle of a long chain of operations.

self.withScope {
val cleanSeqOp = self.context.clean(seqOp)
val cleanCombOp = self.context.clean(combOp)
val combBinOp = new BinaryOperator[U] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a cleaner way to integrate with BinaryOperator before Scala 2.12?

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 8, 2020

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 8, 2020
@github-actions github-actions bot closed this Jan 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants