From 87fd3a4130fda47997161830e257c783d02923ec Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Tue, 22 Jul 2014 11:19:20 +0800 Subject: [PATCH 1/2] bagel unpersist old processed rdd --- bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index 70a99b33d753c..ec788275c637c 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -72,6 +72,7 @@ object Bagel extends Logging { var verts = vertices var msgs = messages var noActivity = false + var nextUseless: RDD[(K, (V, Array[M]))] = null do { logInfo("Starting superstep " + superstep + ".") val startTime = System.currentTimeMillis @@ -83,6 +84,10 @@ object Bagel extends Logging { val superstep_ = superstep // Create a read-only copy of superstep for capture in closure val (processed, numMsgs, numActiveVerts) = comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel) + if (nextUseless != null) { + nextUseless.unpersist(false) + } + nextUseless = processed val timeTaken = System.currentTimeMillis - startTime logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000)) From 182c9dde8d3212fcb916f5313ee7051070bdb6d1 Mon Sep 17 00:00:00 2001 From: Daoyuan Date: Thu, 24 Jul 2014 09:46:44 +0800 Subject: [PATCH 2/2] rename var nextUseless to lastRDD --- bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index ec788275c637c..ef0bb2ac13f08 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -72,7 +72,7 @@ object Bagel extends Logging { var verts = vertices var msgs = messages var noActivity = false - var nextUseless: RDD[(K, (V, Array[M]))] = null + var lastRDD: RDD[(K, (V, Array[M]))] = null do { logInfo("Starting superstep " + superstep + ".") val startTime = System.currentTimeMillis @@ -84,10 +84,10 @@ object Bagel extends Logging { val superstep_ = superstep // Create a read-only copy of superstep for capture in closure val (processed, numMsgs, numActiveVerts) = comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel) - if (nextUseless != null) { - nextUseless.unpersist(false) + if (lastRDD != null) { + lastRDD.unpersist(false) } - nextUseless = processed + lastRDD = processed val timeTaken = System.currentTimeMillis - startTime logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))