Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5744] [CORE] Making RDD.isEmpty robust to empty partitions #4534

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ abstract class RDD[T: ClassTag](
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}

Expand Down Expand Up @@ -1253,9 +1253,9 @@ abstract class RDD[T: ClassTag](

/**
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
* may be empty even when it has 1 or more partitions.
*/
def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0
def isEmpty(): Boolean = partitions.length == 0 || mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try the test case, sure, to investigate. The case of an empty partition should be handled already by take(), so I don't think that's it per se.

(I'm worried about this logic since it will touch every partition, and the point was to not do so. The 2 changes before this line aren't necessary.)

The exception looks more like funny business in handling Seq() (i.e. type Any) somewhere along the line. I'll look.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have a point, that this is actually a bug in take().

sc.parallelize(Seq(1,2,3),1).take(1337) works fine but and returns Array[Int] = Array(1, 2, 3)

sc.parallelize(Seq(),1).take(1) fails on the other hand.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting more convinced that this is a bug in take. It is provoked by RDD[Nothing].take(1). Take for example these three commands:

sc.parallelize(Seq[Nothing]()).take(1) // Fails
sc.parallelize(Seq[Any]()).take(1)     // Array[Any] = Array()
sc.parallelize(Seq[Int]()).take(1)     // Array[Int] = Array()


/**
* Save this RDD as a text file, using string representations of elements.
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(!sc.parallelize(Seq(1)).isEmpty())
assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())
assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty())
// Test empty partition (SPARK-5744)
assert(sc.parallelize(Seq(), 1).isEmpty())
}

test("sample preserves partitioner") {
Expand Down