Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5744] [CORE] Making RDD.isEmpty robust to empty partitions #4534

Closed
wants to merge 1 commit into from

Conversation

tbertelsen
Copy link

RDD.isEmpty fails when an RDD contains empty partitions.

RDD.isEmpty fails when an RDD contains empty partitions.
@tbertelsen
Copy link
Author

FYI: The method was introduced in #4074

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@markhamstra
Copy link
Contributor

Thanks for doing this, but the title of this PR isn't sufficient. It will become the commit log message, so please update the PR title to adequately describe what you did so that other developers don't have to look into the details of the commit or look up the JIRA issue just to get an idea of what this PR is about.

*/
def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0
def isEmpty(): Boolean = partitions.length == 0 || mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try the test case, sure, to investigate. The case of an empty partition should be handled already by take(), so I don't think that's it per se.

(I'm worried about this logic since it will touch every partition, and the point was to not do so. The 2 changes before this line aren't necessary.)

The exception looks more like funny business in handling Seq() (i.e. type Any) somewhere along the line. I'll look.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have a point, that this is actually a bug in take().

sc.parallelize(Seq(1,2,3),1).take(1337) works fine but and returns Array[Int] = Array(1, 2, 3)

sc.parallelize(Seq(),1).take(1) fails on the other hand.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting more convinced that this is a bug in take. It is provoked by RDD[Nothing].take(1). Take for example these three commands:

sc.parallelize(Seq[Nothing]()).take(1) // Fails
sc.parallelize(Seq[Any]()).take(1)     // Array[Any] = Array()
sc.parallelize(Seq[Int]()).take(1)     // Array[Int] = Array()

@tbertelsen tbertelsen changed the title Fixing SPARK-5744. Making RDD.isEmpty robust to empty partitions (SPARK-5744) Feb 11, 2015
@markhamstra
Copy link
Contributor

@tbertelsen Better, but you still should include SPARK-5744 and add [CORE] to the PR title.

@tbertelsen tbertelsen changed the title Making RDD.isEmpty robust to empty partitions (SPARK-5744) SPARK-5744 [CORE] Making RDD.isEmpty robust to empty partitions Feb 11, 2015
@tbertelsen
Copy link
Author

Sorry. Is is good now?

@tbertelsen tbertelsen changed the title SPARK-5744 [CORE] Making RDD.isEmpty robust to empty partitions [SPARK-5744] [CORE] Making RDD.isEmpty robust to empty partitions Feb 11, 2015
@markhamstra
Copy link
Contributor

perfect

@srowen
Copy link
Member

srowen commented Feb 11, 2015

This works, so it's not quite empty partitions:

sc.parallelize(Seq[Int](), 1).isEmpty()

This also creates an exception, so it's to do with Seq() (which is of Nothing, not Any, sorry):

sc.parallelize(Seq(), 1).take(1)

I think the problem roughly boils down to this behavior in Scala:

Seq(Seq().toArray,Seq().toArray).toArray
java.lang.ArrayStoreException: [Ljava.lang.Object;
...

The problem is the Nothing type, rather than emptiness, but I'm still scratching my head figuring out the cleanest way to deal with that.

@srowen
Copy link
Member

srowen commented Feb 11, 2015

I think I have a solution. A Seq[Nothing] must be empty since there are no instances of Nothing. So I think this can be fixed (and optimized) by returning an EmptyRDD from parallelize when given an empty argument.

  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
    assertNotStopped()
    if (seq.isEmpty) {
      new EmptyRDD[T](this)
    } else {
      new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
    }
  }

Witness:

scala> sc.parallelize(Seq(), 1).isEmpty
res0: Boolean = true

scala> sc.parallelize(Seq(), 1).take(1)
res1: Array[Nothing] = Array()

It deserves a unit test for both of these and a run through all the tests. Want to try that?

@tbertelsen
Copy link
Author

How do we treat null values, e.g. sc.parallelize(Seq(new Array[Nothing](1))). I have no experience with TypeTags, but couldn't we use them in a smart way?

By the way sc.parallelize(Seq(new Array[Nothing](1))) fails on every action I have tried.

@srowen
Copy link
Member

srowen commented Feb 11, 2015

Array[Nothing](1) won't compile by itself, nor will Seq[Nothing](1) for different reasons. new Array[Nothing](1) won't be accepted as a Seq[?] for reasons I don't 100% get.

But there is actually still the same problem with the Null type: sc.parallelize(Seq(null), 1).isEmpty still fails. The change above is good but doesn't fix this case. Back to the drawing boards...

@srowen
Copy link
Member

srowen commented Feb 11, 2015

Hmph. I think we might could work around this if runJob didn't have to return an Array, but it does. Even if it collects its results in another collection it has to get copied to an Array. I think that's where this runs into the mismatch between these Scala types and JVMs and arrays and covariance every way I can see.

All of these are fairly artificial cases. An empty RDD or partition of a normal type works fine. I think it's worth making the EmptyRDD change to take care of the Seq() case, and treating the rest as just how it is with Scala and arrays and these types and the current API.

@tbertelsen
Copy link
Author

It compiles if you include new. It just becomes a null filled array.

scala> new Array[Nothing](1)
res19: Array[Nothing] = Array(null)

I think your right about this being fairly artificial. One solution could be, just not to accept Nothing as the type of an RDD, and fail fast and predictable like this:

    if (implicitly[ClassTag[T]].runtimeClass == classOf[Nothing]) {
      throw new RuntimeException("RDD[Nothing] is not allowed.")
    }

We must however be carefull. Right now sc.emptyRDD returns an RDD[Nothing], and there could easily be unforeseen consequences by prohibiting RDD[Nothing]

If we just want to provide a good error message, we could also just catch the ArrayStoreException and wrap it in an exception with a more descriptive message. Perhaps at the place where, it is already wrapped in the SparkDriverExecutionException.

@srowen
Copy link
Member

srowen commented Feb 11, 2015

The case of Nothing is actually fine to solve here; you don't have to reject it. It's Null I can't figure out. I think at best, just fix the Nothing case per above.

@srowen
Copy link
Member

srowen commented Feb 12, 2015

@tbertelsen Here's my proposed fix: srowen@2390a3f

I discovered along the way that histogram() doesn't support an RDD with 0 partitions, so included that fix.

@tbertelsen
Copy link
Author

It looks like it fixes the issue with Seq(), but not e.g. new Array(2) (have not run it though). But after our discussion I think that is perfectly fine.

If you manage to create a non-empty collection of Nothing, then practically nothing will work in spark, or in scala for that matter, e.g., (new Array(2)).toList won't compile.

Handling RDD[Nothing] seems like it will require quite some work, but I fail to see the use case where it brings real functionality.

In summary: your fix seems like a good solution. I'll close this PR.

@tbertelsen tbertelsen closed this Feb 13, 2015
@srowen
Copy link
Member

srowen commented Feb 13, 2015

OK, I can merge my PR. I would be fine with you adding this in to yours, with whatever additions you want to. That is, I didn't intend to hijack your change, and can credit you in JIRA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants