Skip to content

Commit

Permalink
Try mutating old RDDs for delta updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Mar 2, 2014
1 parent 4d88030 commit 3407a8f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class ReplicatedVertexView[VD: ClassTag](
prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
(prevViewIter, shippedVertsIter) =>
val (pid, prevVPart) = prevViewIter.next()
val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
val newVPart = prevVPart.innerJoinKeepLeftDestructive(shippedVertsIter.flatMap(_._2.iterator))
Iterator((pid, newVPart))
}.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,22 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition(index, newValues, newMask)
}

/**
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
def innerJoinKeepLeftDestructive(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = {
val newMask = new BitSet(capacity)
iter.foreach { case (vid, vdata) =>
val pos = index.getPos(vid)
if (pos >= 0) {
newMask.set(pos)
values(pos) = vdata
}
}
new VertexPartition(index, values, newMask)
}

def aggregateUsingIndex[VD2: ClassTag](
iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
Expand Down

0 comments on commit 3407a8f

Please sign in to comment.