Skip to content

Commit

Permalink
Define getPartitions and compute in abstract class for MIMA
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Nov 12, 2014
1 parent 1472390 commit d681f45
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 22 deletions.
18 changes: 16 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Expand Up @@ -20,7 +20,9 @@ package org.apache.spark.graphx
import scala.reflect.ClassTag

import org.apache.spark.Dependency
import org.apache.spark.Partition
import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

Expand All @@ -37,6 +39,20 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl
abstract class EdgeRDD[ED, VD](
@transient sc: SparkContext,
@transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {

private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

/**
* Map the values in an edge partitioning preserving the structure but changing the values.
*
Expand Down Expand Up @@ -71,8 +87,6 @@ abstract class EdgeRDD[ED, VD](
(other: EdgeRDD[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]

private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2]

Expand Down
9 changes: 9 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Expand Up @@ -62,6 +62,15 @@ abstract class VertexRDD[VD](

private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]]

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

/**
* Provides the `RDD[(VertexId, VD)]` equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator
}

/**
* Construct a new VertexRDD that is indexed by only the visible vertices. The resulting
* VertexRDD will be based on a different index and can no longer be quickly joined with this
Expand Down
Expand Up @@ -40,8 +40,6 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
}
setName("EdgeRDD")

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

/**
* If `partitionsRDD` already has a partitioner, use it. Otherwise assume that the
* [[PartitionID]]s in `partitionsRDD` correspond to the actual partitions and create a new
Expand All @@ -50,15 +48,6 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
override val partitioner =
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

/**
Expand Down
Expand Up @@ -38,8 +38,6 @@ class VertexRDDImpl[VD] private[graphx] (

override val partitioner = partitionsRDD.partitioner

override protected def getPartitions: Array[Partition] = partitionsRDD.partitions

override protected def getPreferredLocations(s: Partition): Seq[String] =
partitionsRDD.preferredLocations(s)

Expand Down Expand Up @@ -78,13 +76,6 @@ class VertexRDDImpl[VD] private[graphx] (
partitionsRDD.map(_.size).reduce(_ + _)
}

/**
* Provides the `RDD[(VertexId, VD)]` equivalent output.
*/
override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = {
firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator
}

override private[graphx] def mapVertexPartitions[VD2: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2] = {
Expand Down

0 comments on commit d681f45

Please sign in to comment.