Skip to content

Commit

Permalink
SPARK-5270 [CORE] Provide isEmpty() function in RDD API
Browse files Browse the repository at this point in the history
Pretty minor, but submitted for consideration -- this would at least help people make this check in the most efficient way I know.

Author: Sean Owen <sowen@cloudera.com>

Closes #4074 from srowen/SPARK-5270 and squashes the following commits:

66885b8 [Sean Owen] Add note that JavaRDDLike should not be implemented by user code
2e9b490 [Sean Owen] More tests, and Mima-exclude the new isEmpty method in JavaRDDLike
28395ff [Sean Owen] Add isEmpty to Java, Python
7dd04b7 [Sean Owen] Add efficient RDD.isEmpty()
  • Loading branch information
srowen authored and pwendell committed Jan 20, 2015
1 parent e69fb8c commit 306ff18
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 0 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* Defines operations common to several Java RDD implementations.
* Note that this trait is not intended to be implemented by user code.
*/
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This

Expand Down Expand Up @@ -435,6 +439,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def first(): T = rdd.first()

/**
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = rdd.isEmpty()

/**
* Save this RDD as a text file, using string representations of elements.
*/
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,12 @@ abstract class RDD[T: ClassTag](
* */
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)

/**
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0

/**
* Save this RDD as a text file, using string representations of elements.
*/
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,27 @@ public void take() {
rdd.takeSample(false, 2, 42);
}

@Test
public void isEmpty() {
Assert.assertTrue(sc.emptyRDD().isEmpty());
Assert.assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
Assert.assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
Assert.assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
new Function<Integer,Boolean>() {
@Override
public Boolean call(Integer i) {
return i < 0;
}
}).isEmpty());
Assert.assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i > 1;
}
}).isEmpty());
}

@Test
public void cartesian() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
Expand Down
9 changes: 9 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
assert(!nums.isEmpty())
assert(nums.max() === 4)
assert(nums.min() === 1)
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
Expand Down Expand Up @@ -545,6 +546,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedTopK === nums.sorted(ord).take(5))
}

test("isEmpty") {
assert(sc.emptyRDD.isEmpty())
assert(sc.parallelize(Seq[Int]()).isEmpty())
assert(!sc.parallelize(Seq(1)).isEmpty())
assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())
assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty())
}

test("sample preserves partitioner") {
val partitioner = new HashPartitioner(2)
val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner)
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ object MimaExcludes {
// SPARK-5166 Spark SQL API stabilization
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit")
) ++ Seq(
// SPARK-5270
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.isEmpty")
)

case v if v.startsWith("1.2") =>
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,18 @@ def first(self):
return rs[0]
raise ValueError("RDD is empty")

def isEmpty(self):
"""
Returns true if and only if the RDD contains no elements at all. Note that an RDD
may be empty even when it has at least 1 partition.
>>> sc.parallelize([]).isEmpty()
True
>>> sc.parallelize([1]).isEmpty()
False
"""
return self._jrdd.partitions().size() == 0 or len(self.take(1)) == 0

def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
Expand Down

0 comments on commit 306ff18

Please sign in to comment.