Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-3655 GroupByKeyAndSortValues #3632

Conversation

koertkuipers
Copy link
Contributor

See https://issues.apache.org/jira/browse/SPARK-3655

This pullreq is based on the approach that uses repartitionAndSortWithinPartition, but only implements GroupByKeyAndSortValues and not foldLeft.

…instead of RDD[(K, Iterable[V]). i dont think the Iterable version can be implemented efficiently
…he values (the iterables) are in-memory arrays
@markhamstra
Copy link
Contributor

On a first pass, this doesn't look right. If you are providing additional methods that should be available for RDD[(K, V)] where there is an Ordering available for both K and V, then it seems that the strategy that you should be following should not be to add methods directly to PairRDD, but rather to go one step further than does OrderedRDD, whose purpose is to provide additional methods for RDD[(K, V)] where there is an Ordering available for K.

Please read this comment: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala#L26

...and look at how the Ordering context bound is used:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala#L26
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala#L26

@koertkuipers
Copy link
Contributor Author

hey @markhamstra

i assume you are referring to the one method groupByKeyAndSortValues that has an implicit Ordering[V] parameter, since the other groupByKeyAndSortValues methods do not require an implicit Ordering to be available for the values (it needs to be explicitly provided and is typically use-case specific and not the natural ordering of values).

what the benefit is of an implicit conversion to a class that will have this one method without the implicit parameter, versus a single method with an implicit parameter? it seems to me this makes the api harder to understand. is this for java perhaps?
i have to admit i am not sure i understand the reason for OrderedRDDFunctions existence either (again 2 methods on PairRDDFunctions with implicits seem preferred to me over a new class).

@markhamstra
Copy link
Contributor

The reason for separate classes is to cleanly segregate the available/supportable functionality. Not every PairRDD has keys that can be ordered, so sortByKey shouldn't be part of PairRDD. When keys can be ordered, there is often a natural ordering that is already implicitly in scope. When that is true, then we don't want to force the user to explicitly provide an Ordering -- e.g. if you have an RDD[Int, Foo], then rdd.sortByKey() should just work. If you want a different Ordering, then you just need to bring a new implicit Ordering for that key type into scope.

Things aren't as cleanly separated in the Java API because of the lack of support for implicits there, but that doesn't mean that we should abandon the separation between PairRDD and OrderedRDD on the Scala side or start dirtying-up PairRDD.scala when we want to provide new methods for RDDs whose keys and values can both be ordered.

I really think that we want to repeat the pattern of OrderedRDD for these DoublyOrderedRDD -- or whatever better name you can come up with. The biggest quirk I can see right now is if the types of both keys and values are the same but you want to order them one way when sorting by key and a different way when doing the secondary sort on values. That won't work with implicits since there can only be one implicit Ordering for the type in scope at a time. The problem could either be avoided by using distinct types for the key and value roles, or a method signature with explicit orderings could be added to address this corner case.

@koertkuipers
Copy link
Contributor Author

mhhh i dont really agree with you. i find OrderedRDD confusing because:

  1. you kind of have to know that there is an implicit conversion to OrderedRDD somewhere out there that only works in certain conditions. a method on PairRDDFunctions with an implicit Ordering[V] parameter is part of the public API and also very clear in the restrictions it imposes (since its documented in the required implicit parameter that is part of the API).
  2. its much harder to override the Ordering[V]. forcing the user to bring a new implicit Ordering[V] in scope just to use it once is a somewhat bad use of implicits (and hard to debug). an implicit parameter should allow for an explicit override in my opinion.

anyhow, you are right that what i did does not confirm with OrderedRDD. so if others agree with you i will rewrite it like that, no problem! good point on the corner case of K and V having same type... let me think about that.

@koertkuipers
Copy link
Contributor Author

woops sorry i hit the wrong button there. didnt mean to close this pullreq.

@markhamstra
i will try to update this pullreq sometime in first few weeks of january to address issues you raised

@markhamstra
Copy link
Contributor

@koertkuipers Don't get me wrong, I'm not arguing that the way that PairRDDFuntions and OrderedRDDFunctions work is objectively the best and unquestionably an exemplary use of implicits. It is, however, what we've got and does follow a certain logic. What I'm saying is that it is good to maintain that logic and pattern for an API extension that is similar to OrderedRDDFunctions.

Changing to a different pattern is something we could consider for Spark 2.0 when we can break the established public API.

@koertkuipers
Copy link
Contributor Author

@markhamstra take a look now.
i ignored the situation of K and V having same type, since i think it can be dealt with by using a simple wrapper (value) class for the Vs.

ordering1: Option[Ordering[A]], ordering2: Option[Ordering[B]]
) extends Ordering[Product2[A, B]] {
private val ord1 = ordering1.getOrElse(new HashOrdering[A])
private val ord2 = ordering2.getOrElse(new NoOrdering[B])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected scenario in which a KeyValueOrdering is called for with B unordered? You're setting up KeyValueOrdering to be more general than your needs for its only current usage in OrderedValueRDDFunctions, but I'm not quite grasping how and where else you are expecting KeyValueOrdering to be used.

It's seeming to me that KeyValueOrdering should have two ctors:

KeyValueOrdering[A, B](keyOrdering: Ordering[A], valueOrdering: Ordering[B])

...

this(valueOrdering: Ordering[B]) = this(new HashOrdering[A], valueOrdering)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah thats right i copied it from another pullreq by me that needed a more general version. i can simplify it.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants