Skip to content

Commit

Permalink
bagel unpersist old processed rdd
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Jul 22, 2014
1 parent 7c23c0d commit 87fd3a4
Showing 1 changed file with 5 additions and 0 deletions.
5 changes: 5 additions & 0 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ object Bagel extends Logging {
var verts = vertices
var msgs = messages
var noActivity = false
var nextUseless: RDD[(K, (V, Array[M]))] = null
do {
logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis
Expand All @@ -83,6 +84,10 @@ object Bagel extends Logging {
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
if (nextUseless != null) {
nextUseless.unpersist(false)
}
nextUseless = processed

val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
Expand Down

0 comments on commit 87fd3a4

Please sign in to comment.