From e5ad9d3c070a8b9770c0eef23c9d570bffef43d5 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 27 Aug 2014 00:18:01 +0900 Subject: [PATCH 1/6] Modified Executor to stop SparnEnv at the end of itself --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 10 +++++++--- .../scala/org/apache/spark/executor/Executor.scala | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index fc36e37c53f5e..e2e81a1c9b6a8 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -66,7 +66,8 @@ class SparkEnv ( val sparkFilesDir: String, val metricsSystem: MetricsSystem, val shuffleMemoryManager: ShuffleMemoryManager, - val conf: SparkConf) extends Logging { + val conf: SparkConf, + val isDriver : Boolean) extends Logging { private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() @@ -81,7 +82,9 @@ class SparkEnv ( shuffleManager.stop() broadcastManager.stop() blockManager.stop() - blockManager.master.stop() + if (isDriver) { + blockManager.master.stop() + } metricsSystem.stop() actorSystem.shutdown() // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut @@ -282,7 +285,8 @@ object SparkEnv extends Logging { sparkFilesDir, metricsSystem, shuffleMemoryManager, - conf) + conf, + isDriver) } /** diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2f76e532aeb76..58429bee56900 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -122,6 +122,7 @@ private[spark] class Executor( env.metricsSystem.report() isStopped = true threadPool.shutdown() + env.stop() } class TaskRunner( From 6058a58bdf670327252ef613e531f4ca734a097b Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 27 Aug 2014 01:37:32 +0900 Subject: [PATCH 2/6] Modified Executor not to invoke SparkEnv#stop in local mode --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 58429bee56900..7603d8137cf29 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -122,7 +122,9 @@ private[spark] class Executor( env.metricsSystem.report() isStopped = true threadPool.shutdown() - env.stop() + if (!isLocal) { + env.stop() + } } class TaskRunner( From 4da8535b81bbf0afcc470f46b914fc8935d3dfb3 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 1 Sep 2014 13:44:26 +0900 Subject: [PATCH 3/6] Modified BlockManagerMaster#stop to send StopBlockManagerMaster message when sender is Driver --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 4 +--- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 5 +++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index cf9ee8f39b49f..67086c50ce092 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -82,9 +82,7 @@ class SparkEnv ( shuffleManager.stop() broadcastManager.stop() blockManager.stop() - if (isDriver) { - blockManager.master.stop() - } + blockManager.master.stop() metricsSystem.stop() actorSystem.shutdown() // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index e67b3dc5ce02e..5cf2f243cfd19 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -22,7 +22,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._ -import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils @@ -196,7 +196,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Stop the driver actor, called only on the Spark driver node */ def stop() { - if (driverActor != null) { + val env = SparkEnv.get + if (driverActor != null && env != null && env.isDriver) { tell(StopBlockManagerMaster) driverActor = null logInfo("BlockManagerMaster stopped") From 889e2d16bd7db75e286fde5661c145c8f4bea284 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 1 Sep 2014 20:43:59 +0900 Subject: [PATCH 4/6] Modified BlockManagerMaster to be able to be past isDriver flag --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 8 +++----- .../org/apache/spark/storage/BlockManagerMaster.scala | 5 ++--- .../scala/org/apache/spark/storage/ThreadingTest.scala | 2 +- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../org/apache/spark/storage/BlockManagerSuite.scala | 2 +- 5 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 67086c50ce092..2973d002cc428 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -66,8 +66,7 @@ class SparkEnv ( val sparkFilesDir: String, val metricsSystem: MetricsSystem, val shuffleMemoryManager: ShuffleMemoryManager, - val conf: SparkConf, - val isDriver : Boolean) extends Logging { + val conf: SparkConf) extends Logging { private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() @@ -226,7 +225,7 @@ object SparkEnv extends Logging { val blockManagerMaster = new BlockManagerMaster(registerOrLookup( "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager, mapOutputTracker, shuffleManager) @@ -286,8 +285,7 @@ object SparkEnv extends Logging { sparkFilesDir, metricsSystem, shuffleMemoryManager, - conf, - isDriver) + conf) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 5cf2f243cfd19..2522562942297 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { +class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf, isDriver: Boolean) extends Logging { private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf) private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf) @@ -196,8 +196,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log /** Stop the driver actor, called only on the Spark driver node */ def stop() { - val env = SparkEnv.get - if (driverActor != null && env != null && env.isDriver) { + if (driverActor != null && isDriver) { tell(StopBlockManagerMaster) driverActor = null logInfo("BlockManagerMaster stopped") diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index aa83ea90ee9ee..7540f0d5e2a5a 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -99,7 +99,7 @@ private[spark] object ThreadingTest { val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val blockManager = new BlockManager( "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf, new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1a42fc1b233ba..0bb91febde9d7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -120,7 +120,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, conf) { + val blockManagerMaster = new BlockManagerMaster(null, conf, true) { override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 14ffadab99cae..c200654162268 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -93,7 +93,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter master = new BlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf) + conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() From 039b7479cf97753432f73bd4e0b3fd8d6315f9b3 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 1 Sep 2014 20:54:39 +0900 Subject: [PATCH 5/6] Modified style --- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 2522562942297..7c6e185a4c6f9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -27,7 +27,8 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf, isDriver: Boolean) extends Logging { +class BlockManagerMaster(var driverActor: ActorRef, + conf: SparkConf, isDriver: Boolean) extends Logging { private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf) private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf) From d3005fdce54b60713bd139cd74cc4380178518ad Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 2 Sep 2014 12:14:01 +0900 Subject: [PATCH 6/6] Modified Class definition format of BlockManagerMaster --- .../org/apache/spark/storage/BlockManagerMaster.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 7c6e185a4c6f9..2e262594b3538 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -22,13 +22,16 @@ import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._ -import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} +import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, - conf: SparkConf, isDriver: Boolean) extends Logging { +class BlockManagerMaster( + var driverActor: ActorRef, + conf: SparkConf, + isDriver: Boolean) + extends Logging { private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf) private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf)