From d681f45721c0134566877ab911d415ec02dd9998 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 11 Nov 2014 23:40:25 -0800 Subject: [PATCH] Define getPartitions and compute in abstract class for MIMA --- .../org/apache/spark/graphx/EdgeRDD.scala | 18 ++++++++++++++++-- .../org/apache/spark/graphx/VertexRDD.scala | 9 +++++++++ .../apache/spark/graphx/impl/EdgeRDDImpl.scala | 11 ----------- .../spark/graphx/impl/VertexRDDImpl.scala | 9 --------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 514432af3f4db..869ef15893eb9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -20,7 +20,9 @@ package org.apache.spark.graphx import scala.reflect.ClassTag import org.apache.spark.Dependency +import org.apache.spark.Partition import org.apache.spark.SparkContext +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -37,6 +39,20 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl abstract class EdgeRDD[ED, VD]( @transient sc: SparkContext, @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) { + + private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] + + override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + + override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { + val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) + if (p.hasNext) { + p.next._2.iterator.map(_.copy()) + } else { + Iterator.empty + } + } + /** * Map the values in an edge partitioning preserving the structure but changing the values. * @@ -71,8 +87,6 @@ abstract class EdgeRDD[ED, VD]( (other: EdgeRDD[ED2, _]) (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] - private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] - private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 80bffba8bcd87..f8be17669d892 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -62,6 +62,15 @@ abstract class VertexRDD[VD]( private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]] + override protected def getPartitions: Array[Partition] = partitionsRDD.partitions + + /** + * Provides the `RDD[(VertexId, VD)]` equivalent output. + */ + override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = { + firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator + } + /** * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting * VertexRDD will be based on a different index and can no longer be quickly joined with this diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 1f37d308cb286..4100a85d17ee3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -40,8 +40,6 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( } setName("EdgeRDD") - override protected def getPartitions: Array[Partition] = partitionsRDD.partitions - /** * If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the * [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new @@ -50,15 +48,6 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( override val partitioner = partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) - override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { - val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) - if (p.hasNext) { - p.next._2.iterator.map(_.copy()) - } else { - Iterator.empty - } - } - override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 5055ccabebb2d..08405629bc052 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -38,8 +38,6 @@ class VertexRDDImpl[VD] private[graphx] ( override val partitioner = partitionsRDD.partitioner - override protected def getPartitions: Array[Partition] = partitionsRDD.partitions - override protected def getPreferredLocations(s: Partition): Seq[String] = partitionsRDD.preferredLocations(s) @@ -78,13 +76,6 @@ class VertexRDDImpl[VD] private[graphx] ( partitionsRDD.map(_.size).reduce(_ + _) } - /** - * Provides the `RDD[(VertexId, VD)]` equivalent output. - */ - override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = { - firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator - } - override private[graphx] def mapVertexPartitions[VD2: ClassTag]( f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) : VertexRDD[VD2] = {