Skip to content

Commit

Permalink
Make mutable RDDs optional
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Mar 28, 2014
1 parent e27da44 commit 540c267
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 5 deletions.
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }
* }}}
*/
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)], destructive: Boolean = false)
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]

Expand Down
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ object Pregel extends Logging {
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Update the graph with the new vertices.
prevG = g
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
g = g.outerJoinVertices(newVerts, destructive = i > 0) { (vid, old, newOpt) => newOpt.getOrElse(old) }
g.cache()

val oldMessages = messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} // end of mapReduceTriplets

override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(other: RDD[(VertexId, U)], destructive: Boolean = false)
(updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] =
{
if (classTag[VD] equals classTag[VD2]) {
Expand All @@ -290,7 +290,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = new ReplicatedVertexView[VD2](
changedVerts, edges, routingTable,
Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]), destructive)
new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class ReplicatedVertexView[VD: ClassTag](
updatedVerts: VertexRDD[VD],
edges: EdgeRDD[_],
routingTable: RoutingTable,
prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
prevViewOpt: Option[ReplicatedVertexView[VD]] = None,
destructive: Boolean = false) {

/**
* Within each edge partition, create a local map from vid to an index into the attribute
Expand Down Expand Up @@ -122,6 +123,7 @@ class ReplicatedVertexView[VD: ClassTag](
val shippedVerts = routingTable.get(includeSrc, includeDst)
.zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag))
.partitionBy(edges.partitioner.get)
val destructiveLocal = destructive // to avoid closure capture
// TODO: Consider using a specialized shuffler.

prevViewOpt match {
Expand Down

0 comments on commit 540c267

Please sign in to comment.