Skip to content
Permalink
Browse files

[SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs

## What changes were proposed in this pull request?

Previously a "java.lang.UnsupportedOperationException: empty
collection" exception would be thrown due to using `reduce`, rather
than `fold` or similar that can tolerate empty RDDs.

This behaviour has existed for the Vertex RDDs since it was introduced
in b30e0ae. It seems this behaviour
was inherited by the Edge RDDs via copy-paste in
ee29ef3.

## How was this patch tested?

Two new unit tests.

Closes #23681 from huonw/empty-graphx.

Authored-by: Huon Wilson <Huon.Wilson@data61.csiro.au>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
(cherry picked from commit da52698)
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information...
huonw authored and srowen committed Jan 31, 2019
1 parent 94a4b46 commit 537d15ca5edb0f21ce2a15b30866a53675f90382
@@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (

/** The number of edges in the RDD. */
override def count(): Long = {
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _)
}

override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] =
@@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] (

/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size.toLong).reduce(_ + _)
partitionsRDD.map(_.size.toLong).fold(0)(_ + _)
}

override private[graphx] def mapVertexPartitions[VD2: ClassTag](
@@ -72,7 +72,7 @@ object SVDPlusPlus {

// calculate global rating mean
edges.cache()
val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + b._1, a._2 + b._2))
val u = rs / rc

// construct graph
@@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("count") {
withSpark { sc =>
val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]])
assert(empty.count === 0)

val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ()))
val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges))
assert(nonempty.count === edges.size)
}
}
}
@@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
assert(verts.collect().toSeq === data) // test checkpointed RDD
}
}

test("count") {
withSpark { sc =>
val empty = VertexRDD(sc.emptyRDD[(Long, Unit)])
assert(empty.count === 0)

val n = 100
val nonempty = vertices(sc, n)
assert(nonempty.count === n + 1)
}
}
}
@@ -40,4 +40,13 @@ class SVDPlusPlusSuite extends SparkFunSuite with LocalSparkContext {
}
}

test("Test SVD++ with no edges") {
withSpark { sc =>
val edges = sc.emptyRDD[Edge[Double]]
val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
val (graph, _) = SVDPlusPlus.run(edges, conf)
assert(graph.vertices.count == 0)
assert(graph.edges.count == 0)
}
}
}

0 comments on commit 537d15c

Please sign in to comment.
You can’t perform that action at this time.