Skip to content

Commit

Permalink
Add unpersist option to Pregel
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Apr 30, 2014
1 parent 3bcaa2f commit ad6590c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
11 changes: 7 additions & 4 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ object Pregel extends Logging {
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
activeDirection: EdgeDirection = EdgeDirection.Either,
unpersist: Boolean = true)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
Expand Down Expand Up @@ -154,9 +155,11 @@ object Pregel extends Logging {
logWarning("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
prevG.unpersistVertices(blocking=false)
if (unpersist) {
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
prevG.unpersistVertices(blocking=false)
}
// count the iteration
// logWarning(s"Pregel iteration $i")
// println(s"Pregel iteration $i")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ object Analytics extends Logging {
var numEPart = 4
var partitionStrategy: Option[PartitionStrategy] = None
var numIterOpt: Option[Int] = None
var unpersist: Boolean = true

options.foreach{
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numEPart", v) => numEPart = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case ("numIter", v) => numIterOpt = Some(v.toInt)
case ("unpersist", v) => unpersist = v.toBoolean
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

Expand All @@ -86,7 +88,7 @@ object Analytics extends Logging {
println("GRAPHX: Number of edges " + graph.edges.count)

val pr = (numIterOpt match {
case Some(numIter) => PageRank.run(graph, numIter)
case Some(numIter) => PageRank.run(graph, numIter, unpersist = unpersist)
case None => PageRank.runUntilConvergence(graph, tol)
}).vertices.cache()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ object PageRank extends Logging {
*
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, unpersist: Boolean = true)
: Graph[Double, Double] = {
// Initialize the pagerankGraph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[Double, Double] = graph
Expand All @@ -102,8 +102,8 @@ object PageRank extends Logging {

logWarning("About to execute Pregel.")
// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)(
vertexProgram, sendMessage, messageCombiner)
Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out,
unpersist = unpersist)(vertexProgram, sendMessage, messageCombiner)
}

/**
Expand Down

0 comments on commit ad6590c

Please sign in to comment.