Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18466] added withFilter method to RDD #15899

Closed
wants to merge 1 commit into from

Conversation

reggert
Copy link
Contributor

@reggert reggert commented Nov 16, 2016

What changes were proposed in this pull request?

A withFilter method has been added to RDD as an alias for the filter method. When using for comprehensions, the Scala compiler prefers (and as of 2.12, requires) the lazy withFilter method, only falling back to using the filter method (which, for regular collections, is non-lazy, but for RDDs is lazy). Prior to Scala 2.12, this fallback causes the compiler to emit a warning, and as of Scala 2.12, it results in an error.

How was this patch tested?

RDDSuite was updated by adding a line to "basic operations" that duplicates the behavior of the filter test, but uses a for comprehension instead of a direct method call.

@@ -70,6 +70,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(!nums.isEmpty())
assert(nums.max() === 4)
assert(nums.min() === 1)
assert((for (n <- nums if n > 2) yield n).collect().toList === List(3, 4))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this get compiled into?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to IntelliJ, the for comprehension desugars into

nums.withFilter(n => n > 2).map(n => n)

@srowen
Copy link
Member

srowen commented Nov 16, 2016

You're right that it's not hard to add but is this really intended usage of an RDD? I don't know how much we want to make it operate like a local collection. I'm not strongly against it though, but if this is added, are there not some other places that need this treatment like in Dataset?

@reggert
Copy link
Contributor Author

reggert commented Nov 16, 2016

Using RDD's in for comprehensions dates back at least to the examples given in the old 2010 AMPLab paper "Spark: Cluster Computing with Working Sets". for comprehensions are in no way limited to local collections. They provide syntactic sugar that, for many use cases, makes the code easier to comprehend than a series of chained method calls.
Other examples of using them for non-collections include the Scala Future class as well as query construction using the SLICK database access library.

I agree that a similar change should be made for other abstractions built on top of RDD's, such as DataSets and DStreams, though. I'll add those when I get a chance.

@srowen
Copy link
Member

srowen commented Nov 16, 2016

OK, that makes sense. Yes, of course in reality it's just syntactic sugar. I suppose I wonder: if this works, are there other sugary things one would expect to work that don't? and, does it add to confusion about what's happening locally vs remotely? but maybe the latter isn't a big enough reason to not provide the possibility of the syntax.

@reggert
Copy link
Contributor Author

reggert commented Nov 16, 2016

The only other weird case I've run into is trying to flatMap across multiple RDDs, e.g., for (x <- rdd1; y <- rdd2) yield x + y, but it simply won't compile because RDD.flatMap doesn't support it (for good reason). You can do it for an RDD and a regular collection, e.g., for (x <- rdd; y <- 1 to 10) yield x + y, which is a perfectly reasonable use case. Other than that, the only issues that I'm aware of are ones that would also affect regular (desugared) chained method calls.

@rxin
Copy link
Contributor

rxin commented Nov 16, 2016

I would vote to explicitly discourage this kind of use case. By encouraging this we are creating an illusion that for comprehension can be used, but in reality there are a lot of gotchas.

@rxin
Copy link
Contributor

rxin commented Nov 17, 2016

@reggert @srowen
any reaction to my pushback?

@reggert
Copy link
Contributor Author

reggert commented Nov 17, 2016

I disagree strongly. I've used RDDs in for comprehensions for almost 2
years without issue. Being able to "extend for loops" in this way is a
major language feature of Scala that helps in writing expressive code.
Refusing to support it would be considered surprising and irritating to
most Scala developers.

Rich

On Nov 17, 2016 4:34 PM, "Reynold Xin" notifications@github.com wrote:

@reggert https://github.com/reggert @srowen https://github.com/srowen
any reaction to my pushback?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#15899 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AB4WqZRI4Sd1rBMPOhtClYzySSKmoWegks5q_MhygaJpZM4Kzavq
.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2016

I don't get it. The only thing you can do here is just a simple syntactic sugar with filter and map, and the sugar doesn't even work in general. Isn't it more surprising to fail in some cases?

@reggert
Copy link
Contributor Author

reggert commented Nov 18, 2016

I don't get why you say that it "doesn't even work in general". Under what circumstances doesn't it work? I've never run into any problems with it.

The "simple syntactic sugar" allows very clear, concise code to be written in many cases, and even lets you take advantage of Scala pattern matching for filtering. For example:

val strings = sparkContext.parallelize(List("1213,999", "abc", "456,789"))
val NumberPairString = """(\d{1,5}),(\d{1,5})""".r
val numbers = for (NumberPairString(a, b) <- strings; n <- Seq(a, b)) yield n.toInt
// numbers.collect() yields Array[Int](1213, 999, 456, 789)

Without the for comprehension, you wind up with this significantly uglier and somewhat confusing chain of calls:

val strings = sparkContext.parallelize(List("1213,999", "abc", "456,789"))
val NumberPairString = """(\d{1,5}),(\d{1,5})""".r
val numbers = strings.filter {
   case NumberPairString(_, _) => true 
   case _ => false
}.flatMap{
   case NumberString(a, b) => Seq(a, b)
}.map(_.toInt)
// numbers.collect() yields Array[Int](1213, 999, 456, 789)

There are alternate ways to write this (e.g., with a single flatMap on a pattern match function that returns either 0 or 2 elements), but none of them are as clean and concise as the for version.

@danielyli
Copy link
Contributor

Hello,

I found this issue after encountering the error 'withFilter' method does not yet exist on RDD[(Int, Double)], using 'filter' method instead in my code.

I'm writing a somewhat complicated flatMap-flatMap-map expression involving pair RDDs, and the code is becoming busy enough that sugaring them into a for expression is warranted for readability. Since I'm not using any filters or ifs in the for expression, I found the above error message puzzling. After some tinkering, I think I've found a minimal reproducible case:

for ((k, v) <- pairRdd) yield ...    // pairRdd is of type RDD[(_, _)]

Curiously, the withFilter error doesn't occur if I write for (x <- pairRdd) yield .... @rxin, do you have any insight into this?

@reggert
Copy link
Contributor Author

reggert commented Mar 21, 2017

The (k,v) <- pairRDD expression involves a pattern match, which the compiler converts into a filter/withFilter call on items that match the pattern.

@danielyli
Copy link
Contributor

@rxin, is it possible for Spark to support extractors in for expressions with pair RDDs?

@danielyli
Copy link
Contributor

Hey,

Checking in again on this PR. Can we please support withFilter for pair RDDs? For-expressions are a central sugar in Scala syntax, and without them developers are hampered in their expressiveness compared to other Scala container types.

@rxin, are you able to back up your claims of (1) “ there are a lot of gotchas” and (2) “the sugar doesn’t even work in general”?

@reggert
Copy link
Contributor Author

reggert commented Apr 8, 2017

Strictly speaking, this doesn't just affect pair RDDs. It affects any RDDs on which a for expression involving a filter operation, which includes explicit if clauses as well as pattern matches.

@danielyli
Copy link
Contributor

I'm simply making an argument for a specific use case, though you're right, it's used for more than just pattern matching.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@@ -387,6 +387,14 @@ abstract class RDD[T: ClassTag](
preservesPartitioning = true)
}

/**
* Return a new RDD containing only the elements that satisfy a predicate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @reggert .
Could you fix the indentation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why bother unless we have consensus to introduce this API?

@dongjoon-hyun
Copy link
Member

Hi, @rxin , @srowen , @dbtsai , @felixcheung , @gatorsmile , @cloud-fan .

I know this was not a recommended style, but there really exists users with this issue. And, from Spark 2.4.0, we are releasing Scala-2.12 version as an experiment. Here, this case shows a regression because previously the code works with a warning. I'm +1 for this idea for Spark's Scala-2.12 supports. How do you think about this?

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context available as 'sc' (master = local[*], app id = local-1541571276105).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.12.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> (for (n <- sc.parallelize(Seq(1,2,3)) if n > 2) yield n).toDebugString
<console>:25: error: value withFilter is not a member of org.apache.spark.rdd.RDD[Int]
       (for (n <- sc.parallelize(Seq(1,2,3)) if n > 2) yield n).toDebugString

@rxin
Copy link
Contributor

rxin commented Nov 7, 2018

Thanks for the example. I didn't even know that was possible in earlier versions. I just looked it up: looks like Scala 2.11 rewrites for comprehensions into map, filter, and flatMap.

That said, I don't think it's a bad deal that this no longer works, given it was never intended to work and there's been a deprecation warning.

I still maintain that it is risky to support this, because Scala users learn for comprehension not just for a simple "for filter yield", but as a way to chain multiple generators together, which is not really well supported by Spark (even if it is, it's a really bad operation for users to shoot themselves in the foot because it would be a cartesian product).

Rather than faking it as a local collection, users should know RDD is not.

@dongjoon-hyun
Copy link
Member

I see. Thank you for the clear decision, @rxin ! I'll close the issue as Won't Fix.

And, could you close this PR, @reggert ?

@HyukjinKwon
Copy link
Member

+1 for the decision and closing it.

@dongjoon-hyun
Copy link
Member

Since the issue is closed, this PR will be closed at the next infra clean ups.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants