From 5d9f5b839d35335c92f6677283b8128b64063f0f Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 11 Apr 2014 00:08:18 +0800 Subject: [PATCH 01/15] Add the lifecycle interface --- .../main/java/org/apache/spark/Service.java | 97 +++++++++++++++++++ .../scala/org/apache/spark/Lifecycle.scala | 68 +++++++++++++ .../spark/scheduler/SchedulerBackend.scala | 7 +- .../CoarseGrainedSchedulerBackend.scala | 10 +- .../cluster/mesos/MesosSchedulerBackend.scala | 12 +-- .../spark/scheduler/local/LocalBackend.scala | 11 ++- 6 files changed, 186 insertions(+), 19 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/Service.java create mode 100644 core/src/main/scala/org/apache/spark/Lifecycle.scala diff --git a/core/src/main/java/org/apache/spark/Service.java b/core/src/main/java/org/apache/spark/Service.java new file mode 100644 index 0000000000000..a41524f09f35b --- /dev/null +++ b/core/src/main/java/org/apache/spark/Service.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark; + +import java.io.Closeable; +import java.io.IOException; + +// copy from hadoop +public interface Service extends Closeable { + + /** + * Service states + */ + public enum State { + /** + * Constructed but not initialized + */ + Uninitialized(0, "Uninitialized"), + + /** + * Initialized but not started or stopped + */ + Initialized(1, "Initialized"), + + /** + * started and not stopped + */ + Started(2, "Started"), + + /** + * stopped. No further state transitions are permitted + */ + Stopped(3, "Stopped"); + + /** + * An integer value for use in array lookup and JMX interfaces. + * Although {@link Enum#ordinal()} could do this, explicitly + * identify the numbers gives more stability guarantees over time. + */ + private final int value; + + /** + * A name of the state that can be used in messages + */ + private final String stateName; + + private State(int value, String name) { + this.value = value; + this.stateName = name; + } + + /** + * Get the integer value of a state + * + * @return the numeric value of the state + */ + public int getValue() { + return value; + } + + /** + * Get the name of a state + * + * @return the state's name + */ + @Override + public String toString() { + return stateName; + } + } + + void initialize(); + + void start(); + + void stop(); + + void close() throws IOException; + + SparkConf conf(); + + State state(); +} diff --git a/core/src/main/scala/org/apache/spark/Lifecycle.scala b/core/src/main/scala/org/apache/spark/Lifecycle.scala new file mode 100644 index 0000000000000..3a0d32b7f320d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/Lifecycle.scala @@ -0,0 +1,68 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark + +trait Lifecycle extends Service { + + import Service.State._ + + private var state_ = Uninitialized + + def uninitialized = state_ == Uninitialized + + def initialized = state_ == Initialized + + def started = state_ == Started + + def stopped = state_ == Stopped + + def state: Service.State = state_ + + def initialize(): Unit = synchronized { + require(uninitialized, s"Can't move to initialized state when $state_") + doInitialize + state_ = Initialized + } + + override def start(): Unit = synchronized { + if (uninitialized) initialize() + require(initialized || stopped, s"Can't move to started state when $state_") + doStart() + state_ = Started + } + + override def stop(): Unit = synchronized { + require(started, s"Can't move to stopped state when $state_") + doStop + state_ = Stopped + } + + override def close(): Unit = synchronized { + stop() + } + + def conf: SparkConf + + protected def doInitialize(): Unit = {} + + protected def doStart(): Unit + + protected def doStop(): Unit + + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index f1924a4573b21..deafd217b1718 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -17,15 +17,16 @@ package org.apache.spark.scheduler +import org.apache.spark.Service + /** * A backend interface for scheduling systems that allows plugging in different ones under * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ -private[spark] trait SchedulerBackend { - def start(): Unit - def stop(): Unit +private[spark] trait SchedulerBackend extends Service { def reviveOffers(): Unit + def defaultParallelism(): Int def killTask(taskId: Long, executorId: String): Unit = throw new UnsupportedOperationException diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7bfc30b4208a3..aa461df4157fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,7 +27,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SparkException, TaskState} +import org.apache.spark.{Lifecycle, Logging, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -42,11 +42,11 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) - extends SchedulerBackend with Logging + extends SchedulerBackend with Logging with Lifecycle { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - val conf = scheduler.sc.conf + override def conf = scheduler.conf private val timeout = AkkaUtils.askTimeout(conf) class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { @@ -165,7 +165,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A var driverActor: ActorRef = null val taskIdsOnSlave = new HashMap[String, HashSet[String]] - override def start() { + def doStart() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { @@ -190,7 +190,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } - override def stop() { + def doStop() { stopExecutors() try { if (driverActor != null) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index f878ae338fc95..14f38f12bd3d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -29,7 +29,7 @@ import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} -import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} +import org.apache.spark._ import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.util.Utils @@ -42,7 +42,7 @@ private[spark] class MesosSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, master: String) - extends SchedulerBackend + extends SchedulerBackend with Lifecycle with MScheduler with Logging { @@ -62,8 +62,9 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null - override def start() { - synchronized { + override def conf = scheduler.conf + + override def doStart() { classLoader = Thread.currentThread.getContextClassLoader new Thread("MesosSchedulerBackend driver") { @@ -82,7 +83,6 @@ private[spark] class MesosSchedulerBackend( }.start() waitForRegister() - } } def createExecutorInfo(execId: String): ExecutorInfo = { @@ -296,7 +296,7 @@ private[spark] class MesosSchedulerBackend( } } - override def stop() { + override def doStop() { if (driver != null) { driver.stop() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 16e2f5cf3076d..68698fe689e3a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import akka.actor.{Actor, ActorRef, Props} -import org.apache.spark.{Logging, SparkEnv, TaskState} +import org.apache.spark.{Lifecycle, Logging, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} @@ -80,18 +80,19 @@ private[spark] class LocalActor( * on a single Executor (created by the LocalBackend) running locally. */ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) - extends SchedulerBackend with ExecutorBackend { + extends SchedulerBackend with ExecutorBackend with Lifecycle { var localActor: ActorRef = null - override def start() { + override def conf = scheduler.conf + + override def doStart() { localActor = SparkEnv.get.actorSystem.actorOf( Props(new LocalActor(scheduler, this, totalCores)), "LocalBackendActor") } - override def stop() { - } + override def doStop() {} override def reviveOffers() { localActor ! ReviveOffers From 8912b9b39e08339e252d9728f4573748dfd653ab Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 12 Apr 2014 15:16:09 +0800 Subject: [PATCH 02/15] add Lifecycle interface to HistoryServer ContextCleaner HttpServer UI Sink TaskScheduler --- .../scala/org/apache/spark/ContextCleaner.scala | 14 ++++++-------- .../main/scala/org/apache/spark/HttpServer.scala | 8 +++++--- .../main/scala/org/apache/spark/Lifecycle.scala | 6 ++---- .../scala/org/apache/spark/SecurityManager.scala | 2 +- .../org/apache/spark/deploy/client/AppClient.scala | 10 +++++----- .../spark/deploy/history/HistoryServer.scala | 11 ++++------- .../spark/deploy/master/ui/MasterWebUI.scala | 2 ++ .../spark/deploy/worker/ui/WorkerWebUI.scala | 4 +++- .../apache/spark/metrics/sink/ConsoleSink.scala | 6 ++++-- .../org/apache/spark/metrics/sink/CsvSink.scala | 6 ++++-- .../apache/spark/metrics/sink/GraphiteSink.scala | 6 ++++-- .../org/apache/spark/metrics/sink/JmxSink.scala | 6 ++++-- .../apache/spark/metrics/sink/MetricsServlet.scala | 6 ++++-- .../scala/org/apache/spark/metrics/sink/Sink.scala | 6 +++--- .../org/apache/spark/scheduler/DAGScheduler.scala | 8 +++++--- .../org/apache/spark/scheduler/TaskScheduler.scala | 8 ++------ .../apache/spark/scheduler/TaskSchedulerImpl.scala | 4 ++-- .../scheduler/cluster/SimrSchedulerBackend.scala | 8 ++++---- .../cluster/SparkDeploySchedulerBackend.scala | 8 ++++---- .../mesos/CoarseMesosSchedulerBackend.scala | 8 ++++---- .../main/scala/org/apache/spark/ui/SparkUI.scala | 9 +++++---- .../src/main/scala/org/apache/spark/ui/WebUI.scala | 6 ++++-- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 5 +++-- 23 files changed, 84 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 54e08d7866f75..10de85963dc22 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -51,7 +51,7 @@ private class CleanupTaskWeakReference( * to be processed when the associated object goes out of scope of the application. Actual * cleanup is performed in a separate daemon thread. */ -private[spark] class ContextCleaner(sc: SparkContext) extends Logging { +private[spark] class ContextCleaner(sc: SparkContext) extends Logging with Lifecycle { private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference] with SynchronizedBuffer[CleanupTaskWeakReference] @@ -67,27 +67,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { * Whether the cleaning thread will block on cleanup tasks. * This is set to true only for tests. */ - private val blockOnCleanupTasks = sc.conf.getBoolean( + private val blockOnCleanupTasks = conf.getBoolean( "spark.cleaner.referenceTracking.blocking", false) - @volatile private var stopped = false - /** Attach a listener object to get information of when objects are cleaned. */ def attachListener(listener: CleanerListener) { listeners += listener } + def conf = sc.conf + /** Start the cleaner. */ - def start() { + def doStart() { cleaningThread.setDaemon(true) cleaningThread.setName("Spark Context Cleaner") cleaningThread.start() } /** Stop the cleaner. */ - def stop() { - stopped = true - } + def doStop() { } /** Register a RDD for cleanup when it is garbage collected. */ def registerRDDForCleanup(rdd: RDD[_]) { diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 7e9b517f901a2..142bcd0e491ce 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -42,11 +42,13 @@ private[spark] class ServerStateException(message: String) extends Exception(mes * around a Jetty server. */ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager) - extends Logging { + extends Logging with Lifecycle { private var server: Server = null private var port: Int = -1 - def start() { + def conf = securityManager.sparkConf + + def doStart() { if (server != null) { throw new ServerStateException("Server is already started") } else { @@ -117,7 +119,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan sh } - def stop() { + def doStop() { if (server == null) { throw new ServerStateException("Server is already stopped") } else { diff --git a/core/src/main/scala/org/apache/spark/Lifecycle.scala b/core/src/main/scala/org/apache/spark/Lifecycle.scala index 3a0d32b7f320d..de1c61829b444 100644 --- a/core/src/main/scala/org/apache/spark/Lifecycle.scala +++ b/core/src/main/scala/org/apache/spark/Lifecycle.scala @@ -23,6 +23,8 @@ trait Lifecycle extends Service { private var state_ = Uninitialized + def conf: SparkConf + def uninitialized = state_ == Uninitialized def initialized = state_ == Initialized @@ -56,13 +58,9 @@ trait Lifecycle extends Service { stop() } - def conf: SparkConf - protected def doInitialize(): Unit = {} protected def doStart(): Unit protected def doStop(): Unit - - } \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index b52f2d4f416b2..66d853667e54e 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -133,7 +133,7 @@ import org.apache.spark.deploy.SparkHadoopUtil * can take place. */ -private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging { +private[spark] class SecurityManager(val sparkConf: SparkConf) extends Logging { // key used to store the spark secret in the Hadoop UGI private val sparkSecretLookupKey = "sparkCookie" diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 8901806de9262..c9689809b6f63 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -26,7 +26,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.{Lifecycle, Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -44,8 +44,8 @@ private[spark] class AppClient( masterUrls: Array[String], appDescription: ApplicationDescription, listener: AppClientListener, - conf: SparkConf) - extends Logging { + val conf: SparkConf) + extends Logging with Lifecycle { val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 @@ -181,12 +181,12 @@ private[spark] class AppClient( } } - def start() { + def doStart() { // Just launch an actor; it will call back into the listener. actor = actorSystem.actorOf(Props(new ClientActor)) } - def stop() { + def doStop() { if (actor != null) { try { val timeout = AkkaUtils.askTimeout(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 97d2ba9deed33..714397b66f395 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -46,7 +46,7 @@ import org.apache.spark.util.Utils */ class HistoryServer( val baseLogDir: String, - conf: SparkConf) + val conf: SparkConf) extends SparkUIContainer("History Server") with Logging { import HistoryServer._ @@ -64,8 +64,6 @@ class HistoryServer( // Number of completed applications found in this directory private var numCompletedApplications = 0 - @volatile private var stopped = false - /** * A background thread that periodically checks for event log updates on disk. * @@ -105,7 +103,7 @@ class HistoryServer( * This starts a background thread that periodically synchronizes information displayed on * this UI with the event logs in the provided base directory. */ - def start() { + override def doStart() { logCheckingThread.start() } @@ -207,9 +205,8 @@ class HistoryServer( } /** Stop the server and close the file system. */ - override def stop() { - super.stop() - stopped = true + override def doStop() { + super.doStop() fileSystem.close() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 30c8ade408a5a..1c73dd9694302 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -59,6 +59,8 @@ class MasterWebUI(val master: Master, requestedPort: Int) ) } + def conf = master.conf + /** Bind to the HTTP server behind this web interface. */ override def bind() { try { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 5625a44549aaa..70bd081a733b5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -39,7 +39,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I private val host = Utils.localHostName() private val port = requestedPort.getOrElse( - worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) + conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) private val indexPage = new IndexPage(this) private val handlers: Seq[ServletContextHandler] = { @@ -57,6 +57,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I ) } + def conf = worker.conf + /** Bind to the HTTP server behind this web interface. */ override def bind() { try { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 05852f1f98993..97fe3f03f91be 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -50,11 +50,13 @@ private[spark] class ConsoleSink(val property: Properties, val registry: MetricR .convertRatesTo(TimeUnit.SECONDS) .build() - override def start() { + def conf = securityMgr.sparkConf + + def doStart() { reporter.start(pollPeriod, pollUnit) } - override def stop() { + def doStop() { reporter.stop() } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 542dce65366b2..8c0d63447b798 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -59,11 +59,13 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis .convertRatesTo(TimeUnit.SECONDS) .build(new File(pollDir)) - override def start() { + def conf = securityMgr.sparkConf + + def doStart() { reporter.start(pollPeriod, pollUnit) } - override def stop() { + def doStop() { reporter.stop() } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index aeb4ad44a0647..11c4025f403d3 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -74,11 +74,13 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric .prefixedWith(prefix) .build(graphite) - override def start() { + def conf = securityMgr.sparkConf + + def doStart() { reporter.start(pollPeriod, pollUnit) } - override def stop() { + def doStop() { reporter.stop() } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index ed27234b4e760..4e6d1cb73637d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -27,11 +27,13 @@ private[spark] class JmxSink(val property: Properties, val registry: MetricRegis val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() - override def start() { + def conf = securityMgr.sparkConf + + def doStart() { reporter.start() } - override def stop() { + def doStop() { reporter.stop() } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 571539ba5e467..f1c55a0468c25 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -54,7 +54,9 @@ private[spark] class MetricsServlet(val property: Properties, val registry: Metr mapper.writeValueAsString(registry) } - override def start() { } + def conf = securityMgr.sparkConf - override def stop() { } + def doStart() { } + + def doStop() { } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala index 6f2b5a06027ea..f594c035f7dcb 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -17,7 +17,7 @@ package org.apache.spark.metrics.sink -private[spark] trait Sink { - def start: Unit - def stop: Unit +import org.apache.spark.Lifecycle + +private[spark] trait Sink extends Lifecycle { } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c6cbf14e20069..040ef3397b5f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -59,7 +59,7 @@ class DAGScheduler( mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv) - extends Logging { + extends Logging with Lifecycle { import DAGScheduler._ @@ -113,6 +113,8 @@ class DAGScheduler( // stray messages to detect. private val failedEpoch = new HashMap[String, Long] + def conf = env.conf + taskScheduler.setDAGScheduler(this) /** @@ -126,7 +128,7 @@ class DAGScheduler( * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed. */ - def start() { + def doStart() { eventProcessActor = env.actorSystem.actorOf(Props(new Actor { /** * The main event loop of the DAG scheduler. @@ -1151,7 +1153,7 @@ class DAGScheduler( Nil } - def stop() { + def doStop() { if (eventProcessActor != null) { eventProcessActor ! StopDAGScheduler } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 92616c997e20c..e9bc41e1711e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import org.apache.spark.Lifecycle import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** @@ -27,22 +28,17 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode * them, retrying if there are failures, and mitigating stragglers. They return events to the * DAGScheduler. */ -private[spark] trait TaskScheduler { +private[spark] trait TaskScheduler extends Lifecycle { def rootPool: Pool def schedulingMode: SchedulingMode - def start(): Unit - // Invoked after system has successfully initialized (typically in spark context). // Yarn uses this to bootstrap allocation of resources based on preferred locations, // wait for slave registerations, etc. def postStartHook() { } - // Disconnect from the cluster. - def stop(): Unit - // Submit a sequence of tasks to run. def submitTasks(taskSet: TaskSet): Unit diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index acd152dda89d4..075726d2edd6c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl( def newTaskId(): Long = nextTaskId.getAndIncrement() - override def start() { + def doStart() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { @@ -343,7 +343,7 @@ private[spark] class TaskSchedulerImpl( } } - override def stop() { + def doStop() { if (backend != null) { backend.stop() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index d99c76117c168..4f547af2d23d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -35,8 +35,8 @@ private[spark] class SimrSchedulerBackend( val maxCores = conf.getInt("spark.simr.executor.cores", 1) - override def start() { - super.start() + override def doStart() { + super.doStart() val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"), @@ -60,10 +60,10 @@ private[spark] class SimrSchedulerBackend( fs.rename(tmpPath, filePath) } - override def stop() { + override def doStop() { val conf = new Configuration() val fs = FileSystem.get(conf) fs.delete(new Path(driverFilePath), false) - super.stop() + super.doStop() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 936e9db80573d..a65e85fd08507 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -37,8 +37,8 @@ private[spark] class SparkDeploySchedulerBackend( val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - override def start() { - super.start() + override def doStart() { + super.doStart() // The endpoint for executors to talk to us val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( @@ -55,9 +55,9 @@ private[spark] class SparkDeploySchedulerBackend( client.start() } - override def stop() { + override def doStop() { stopping = true - super.stop() + super.doStop() client.stop() if (shutdownCallback != null) { shutdownCallback(this) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 06b041e1fd9a9..d0518f7b6bb3c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -85,8 +85,8 @@ private[spark] class CoarseMesosSchedulerBackend( id } - override def start() { - super.start() + override def doStart() { + super.doStart() synchronized { new Thread("CoarseMesosSchedulerBackend driver") { @@ -261,8 +261,8 @@ private[spark] class CoarseMesosSchedulerBackend( scheduler.error(message) } - override def stop() { - super.stop() + override def doStop() { + super.doStop() if (driver != null) { driver.stop() } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index dac11ec1cf52f..560270fc56ce2 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils /** Top level user interface for Spark */ private[spark] class SparkUI( val sc: SparkContext, - conf: SparkConf, + val conf: SparkConf, val listenerBus: SparkListenerBus, var appName: String, val basePath: String = "") @@ -82,11 +82,12 @@ private[spark] class SparkUI( } /** Initialize all components of the server */ - def start() { + override def doStart() { storage.start() jobs.start() env.start() exec.start() + super.doStart() // Storage status listener must receive events first, as other listeners depend on its state listenerBus.addListener(storageStatusListener) @@ -109,8 +110,8 @@ private[spark] class SparkUI( } /** Stop the server behind this web interface. Only valid after bind(). */ - override def stop() { - super.stop() + override def doStop() { + super.doStop() logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 2cc7582eca8a3..6cc4e8d2bba32 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -19,8 +19,9 @@ package org.apache.spark.ui import java.text.SimpleDateFormat import java.util.Date +import org.apache.spark.Lifecycle -private[spark] abstract class WebUI(name: String) { +private[spark] abstract class WebUI(name: String) extends Lifecycle { protected var serverInfo: Option[ServerInfo] = None /** @@ -33,10 +34,11 @@ private[spark] abstract class WebUI(name: String) { def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) /** Stop the server behind this web interface. Only valid after bind(). */ - def stop() { + protected override def doStop() { assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name)) serverInfo.get.server.stop() } + protected override def doStart() { } } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index db4df1d1212ff..996ec423e92b1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -50,8 +50,9 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val taskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE - override def start() = {} - override def stop() = {} + override def doStart() = {} + override def doStop() = {} + override def conf: SparkConf = null override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) From 3ec1b9f25faa6416d7142330da6a73576f90e9eb Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 12 Apr 2014 15:28:26 +0800 Subject: [PATCH 03/15] add newline to end --- core/src/main/scala/org/apache/spark/Lifecycle.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Lifecycle.scala b/core/src/main/scala/org/apache/spark/Lifecycle.scala index de1c61829b444..bcb7dafb6063b 100644 --- a/core/src/main/scala/org/apache/spark/Lifecycle.scala +++ b/core/src/main/scala/org/apache/spark/Lifecycle.scala @@ -63,4 +63,4 @@ trait Lifecycle extends Service { protected def doStart(): Unit protected def doStop(): Unit -} \ No newline at end of file +} From a2822f3ab5c76ccbfec16c5d0f7e9eaa4261e69c Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 12 Apr 2014 16:56:31 +0800 Subject: [PATCH 04/15] merge master --- .../scala/org/apache/spark/deploy/history/HistoryServer.scala | 2 +- .../scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala | 4 +++- .../scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala | 4 +++- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 2 +- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index bb08c3c65a445..b5df779ec0a3a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -91,7 +91,7 @@ class HistoryServer( * This starts a background thread that periodically synchronizes information displayed on * this UI with the event logs in the provided base directory. */ - def doInitialize() { + override def doInitialize() { attachPage(new HistoryPage(this)) attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) logCheckingThread.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 927f8121faec2..571a560ff2114 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -33,8 +33,10 @@ class MasterWebUI(val master: Master, requestedPort: Int) val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) + def conf = master.conf + /** Initialize all components of the server. */ - def doInitialize() { + override def doInitialize() { attachPage(new ApplicationPage(this)) attachPage(new MasterPage(this)) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 29790959a9e21..9bbb4d22d8cec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -39,8 +39,10 @@ class WorkerWebUI( val timeout = AkkaUtils.askTimeout(worker.conf) + def conf = worker.conf + /** Initialize all components of the server. */ - def doInitialize() { + override def doInitialize() { val logPage = new LogPage(this) attachPage(logPage) attachPage(new WorkerPage(this)) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index fe99642b7460c..395db147e9126 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -51,7 +51,7 @@ private[spark] class SparkUI( /** Initialize all components of the server. */ - def doInitialize() { + override def doInitialize() { listenerBus.addListener(storageStatusListener) val jobProgressTab = new JobProgressTab(this) attachTab(jobProgressTab) From c2ae34e3c463dd3a8ed02216be68f286f58c0d9b Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 12 Apr 2014 20:05:20 +0800 Subject: [PATCH 05/15] add Lifecycle interface to StreamingContext JavaStreamingContext JobGenerator NetworkInputTracker StreamingListenerBus --- .../org/apache/spark/ContextCleaner.scala | 4 +-- .../scala/org/apache/spark/HttpServer.scala | 4 +-- .../spark/deploy/client/AppClient.scala | 4 +-- .../spark/deploy/history/HistoryServer.scala | 2 +- .../spark/metrics/sink/ConsoleSink.scala | 4 +-- .../apache/spark/metrics/sink/CsvSink.scala | 4 +-- .../spark/metrics/sink/GraphiteSink.scala | 4 +-- .../apache/spark/metrics/sink/JmxSink.scala | 4 +-- .../spark/metrics/sink/MetricsServlet.scala | 4 +-- .../apache/spark/scheduler/DAGScheduler.scala | 4 +-- .../spark/scheduler/TaskSchedulerImpl.scala | 4 +-- .../CoarseGrainedSchedulerBackend.scala | 4 +-- .../cluster/SimrSchedulerBackend.scala | 4 +-- .../cluster/SparkDeploySchedulerBackend.scala | 10 +++--- .../mesos/CoarseMesosSchedulerBackend.scala | 4 +-- .../cluster/mesos/MesosSchedulerBackend.scala | 4 +-- .../spark/scheduler/local/LocalBackend.scala | 4 +-- .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../scala/org/apache/spark/ui/WebUI.scala | 4 +-- .../spark/scheduler/DAGSchedulerSuite.scala | 4 +-- .../spark/metrics/sink/GangliaSink.scala | 4 +-- .../spark/streaming/StreamingContext.scala | 31 +++++-------------- .../api/java/JavaStreamingContext.scala | 19 +++++++++--- .../streaming/scheduler/JobGenerator.scala | 15 +++++++-- .../streaming/scheduler/JobScheduler.scala | 16 +++++++--- .../scheduler/NetworkInputTracker.scala | 18 ++++++----- .../scheduler/StreamingListenerBus.scala | 8 ++--- .../cluster/YarnClientSchedulerBackend.scala | 8 ++--- 28 files changed, 106 insertions(+), 95 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 10de85963dc22..999d34d1b4e09 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -78,14 +78,14 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging with Lifec def conf = sc.conf /** Start the cleaner. */ - def doStart() { + override protected def doStart() { cleaningThread.setDaemon(true) cleaningThread.setName("Spark Context Cleaner") cleaningThread.start() } /** Stop the cleaner. */ - def doStop() { } + override protected def doStop() { } /** Register a RDD for cleanup when it is garbage collected. */ def registerRDDForCleanup(rdd: RDD[_]) { diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 142bcd0e491ce..2d09869f99b0c 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -48,7 +48,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan def conf = securityManager.sparkConf - def doStart() { + override protected def doStart() { if (server != null) { throw new ServerStateException("Server is already started") } else { @@ -119,7 +119,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan sh } - def doStop() { + override protected def doStop() { if (server == null) { throw new ServerStateException("Server is already stopped") } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index c9689809b6f63..42a84683579c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -181,12 +181,12 @@ private[spark] class AppClient( } } - def doStart() { + override protected def doStart() { // Just launch an actor; it will call back into the listener. actor = actorSystem.actorOf(Props(new ClientActor)) } - def doStop() { + override protected def doStop() { if (actor != null) { try { val timeout = AkkaUtils.askTimeout(conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index b5df779ec0a3a..9cb9177aac4ec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -182,7 +182,7 @@ class HistoryServer( } /** Stop the server and close the file system. */ - override def doStop() { + override protected def doStop() { super.doStop() fileSystem.close() } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala index 97fe3f03f91be..d9ffd95ad29b3 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -52,11 +52,11 @@ private[spark] class ConsoleSink(val property: Properties, val registry: MetricR def conf = securityMgr.sparkConf - def doStart() { + override protected def doStart() { reporter.start(pollPeriod, pollUnit) } - def doStop() { + override protected def doStop() { reporter.stop() } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala index 8c0d63447b798..ea6ea2e12f61e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -61,11 +61,11 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis def conf = securityMgr.sparkConf - def doStart() { + override protected def doStart() { reporter.start(pollPeriod, pollUnit) } - def doStop() { + override protected def doStop() { reporter.stop() } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index 11c4025f403d3..d07f1c03c0dfc 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -76,11 +76,11 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric def conf = securityMgr.sparkConf - def doStart() { + override protected def doStart() { reporter.start(pollPeriod, pollUnit) } - def doStop() { + override protected def doStop() { reporter.stop() } } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala index 4e6d1cb73637d..ae693d32da7b4 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -29,11 +29,11 @@ private[spark] class JmxSink(val property: Properties, val registry: MetricRegis def conf = securityMgr.sparkConf - def doStart() { + override protected def doStart() { reporter.start() } - def doStop() { + override protected def doStop() { reporter.stop() } diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index f1c55a0468c25..00182e2fd4adf 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -56,7 +56,7 @@ private[spark] class MetricsServlet(val property: Properties, val registry: Metr def conf = securityMgr.sparkConf - def doStart() { } + override protected def doStart() { } - def doStop() { } + override protected def doStop() { } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 040ef3397b5f5..9a8e0ed452327 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -128,7 +128,7 @@ class DAGScheduler( * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed. */ - def doStart() { + override protected def doStart() { eventProcessActor = env.actorSystem.actorOf(Props(new Actor { /** * The main event loop of the DAG scheduler. @@ -1153,7 +1153,7 @@ class DAGScheduler( Nil } - def doStop() { + override protected def doStop() { if (eventProcessActor != null) { eventProcessActor ! StopDAGScheduler } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 075726d2edd6c..00ee2834023dc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl( def newTaskId(): Long = nextTaskId.getAndIncrement() - def doStart() { + override protected def doStart() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { @@ -343,7 +343,7 @@ private[spark] class TaskSchedulerImpl( } } - def doStop() { + override protected def doStop() { if (backend != null) { backend.stop() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index aa461df4157fe..65941789c1471 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -165,7 +165,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A var driverActor: ActorRef = null val taskIdsOnSlave = new HashMap[String, HashSet[String]] - def doStart() { + override protected def doStart() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { @@ -190,7 +190,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } - def doStop() { + override protected def doStop() { stopExecutors() try { if (driverActor != null) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 4f547af2d23d3..f7c59bca13a63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -35,7 +35,7 @@ private[spark] class SimrSchedulerBackend( val maxCores = conf.getInt("spark.simr.executor.cores", 1) - override def doStart() { + override protected def doStart() { super.doStart() val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( @@ -60,7 +60,7 @@ private[spark] class SimrSchedulerBackend( fs.rename(tmpPath, filePath) } - override def doStop() { + override protected def doStop() { val conf = new Configuration() val fs = FileSystem.get(conf) fs.delete(new Path(driverFilePath), false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index a65e85fd08507..d8d72891f9031 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -32,12 +32,11 @@ private[spark] class SparkDeploySchedulerBackend( with Logging { var client: AppClient = null - var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - override def doStart() { + override protected def doStart() { super.doStart() // The endpoint for executors to talk to us @@ -55,8 +54,7 @@ private[spark] class SparkDeploySchedulerBackend( client.start() } - override def doStop() { - stopping = true + override protected def doStop() { super.doStop() client.stop() if (shutdownCallback != null) { @@ -69,13 +67,13 @@ private[spark] class SparkDeploySchedulerBackend( } override def disconnected() { - if (!stopping) { + if (!stopped) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead() { - if (!stopping) { + if (!stopped) { logError("Spark cluster looks dead, giving up.") scheduler.error("Spark cluster looks down") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index d0518f7b6bb3c..305f16a939035 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -85,7 +85,7 @@ private[spark] class CoarseMesosSchedulerBackend( id } - override def doStart() { + override protected def doStart() { super.doStart() synchronized { @@ -261,7 +261,7 @@ private[spark] class CoarseMesosSchedulerBackend( scheduler.error(message) } - override def doStop() { + override protected def doStop() { super.doStop() if (driver != null) { driver.stop() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 6e6fb0c92aad9..4c7f5ef2631d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -64,7 +64,7 @@ private[spark] class MesosSchedulerBackend( override def conf = scheduler.conf - override def doStart() { + override protected def doStart() { classLoader = Thread.currentThread.getContextClassLoader new Thread("MesosSchedulerBackend driver") { @@ -296,7 +296,7 @@ private[spark] class MesosSchedulerBackend( } } - override def doStop() { + override protected def doStop() { if (driver != null) { driver.stop() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 68698fe689e3a..443cb35491f3d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -86,13 +86,13 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: override def conf = scheduler.conf - override def doStart() { + override protected def doStart() { localActor = SparkEnv.get.actorSystem.actorOf( Props(new LocalActor(scheduler, this, totalCores)), "LocalBackendActor") } - override def doStop() {} + override protected def doStop() {} override def reviveOffers() { localActor ! ReviveOffers diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 395db147e9126..585c6ecc9933a 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -78,7 +78,7 @@ private[spark] class SparkUI( } /** Stop the server behind this web interface. Only valid after bind(). */ - override def doStop() { + override protected def doStop() { super.doStop() logInfo("Stopped Spark web UI at %s".format(appUIAddress)) } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 839914c67ff21..fcb656876ce5a 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -107,12 +107,12 @@ private[spark] abstract class WebUI( def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) /** Stop the server behind this web interface. Only valid after bind(). */ - def doStop() { + override protected def doStop() { assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(className)) serverInfo.get.server.stop() } - protected override def doStart() { } + override protected def doStart() { } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 996ec423e92b1..ede380e946f66 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -50,8 +50,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val taskScheduler = new TaskScheduler() { override def rootPool: Pool = null override def schedulingMode: SchedulingMode = SchedulingMode.NONE - override def doStart() = {} - override def doStop() = {} + override protected def doStart() = { } + override protected def doStop() = { } override def conf: SparkConf = null override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala index d03d7774e8c80..5dd234c3b3eb4 100644 --- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala +++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -75,11 +75,11 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry, .convertRatesTo(TimeUnit.SECONDS) .build(ganglia) - override def start() { + override protected def doStart() { reporter.start(pollPeriod, pollUnit) } - override def stop() { + override protected def doStop() { reporter.stop() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ff5d0aaa3d0bd..23419ff647af6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -55,7 +55,7 @@ class StreamingContext private[streaming] ( sc_ : SparkContext, cp_ : Checkpoint, batchDur_ : Duration - ) extends Logging { + ) extends Logging with Lifecycle { /** * Create a StreamingContext using an existing SparkContext. @@ -121,7 +121,7 @@ class StreamingContext private[streaming] ( + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") } - private[streaming] val conf = sc.conf + def conf = sc.conf private[streaming] val env = SparkEnv.get @@ -159,15 +159,6 @@ class StreamingContext private[streaming] ( private[streaming] val uiTab = new StreamingTab(this) - /** Enumeration to identify current state of the StreamingContext */ - private[streaming] object StreamingContextState extends Enumeration { - type CheckpointState = Value - val Initialized, Started, Stopped = Value - } - - import StreamingContextState._ - private[streaming] var state = Initialized - /** * Return the associated Spark context */ @@ -415,18 +406,9 @@ class StreamingContext private[streaming] ( /** * Start the execution of the streams. */ - def start(): Unit = synchronized { - // Throw exception if the context has already been started once - // or if a stopped context is being started again - if (state == Started) { - throw new SparkException("StreamingContext has already been started") - } - if (state == Stopped) { - throw new SparkException("StreamingContext has already been stopped") - } + override protected def doStart() { validate() scheduler.start() - state = Started } /** @@ -466,11 +448,11 @@ class StreamingContext private[streaming] ( def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { // Warn (but not fail) if context is stopped twice, // or context is stopped before starting - if (state == Initialized) { + if (initialized) { logWarning("StreamingContext has not been started yet") return } - if (state == Stopped) { + if (stopped) { logWarning("StreamingContext has already been stopped") return } // no need to throw an exception as its okay to stop twice @@ -478,8 +460,9 @@ class StreamingContext private[streaming] ( logInfo("StreamingContext stopped successfully") waiter.notifyStop() if (stopSparkContext) sc.stop() - state = Stopped + super.stop() } + override protected def doStop() { } } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index c800602d0959b..0c1e81adfed42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -27,7 +27,7 @@ import java.util.{List => JList, Map => JMap} import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{Lifecycle, SparkConf, SparkContext} import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.rdd.RDD @@ -48,7 +48,7 @@ import org.apache.spark.streaming.dstream.DStream * respectively. `context.awaitTransformation()` allows the current thread to wait for the * termination of a context by `stop()` or by an exception. */ -class JavaStreamingContext(val ssc: StreamingContext) { +class JavaStreamingContext(val ssc: StreamingContext) extends Lifecycle { /** * Create a StreamingContext. @@ -147,6 +147,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** The underlying SparkContext */ val sparkContext = new JavaSparkContext(ssc.sc) + override def conf = sparkContext.conf + /** * Create an input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited @@ -477,7 +479,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Start the execution of the streams. */ - def start(): Unit = { + override protected def doStart(): Unit = { ssc.start() } @@ -501,15 +503,19 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Stop the execution of the streams. Will stop the associated JavaSparkContext as well. */ - def stop(): Unit = { + override def stop(): Unit = { ssc.stop() + super.stop() } /** * Stop the execution of the streams. * @param stopSparkContext Stop the associated SparkContext or not */ - def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext) + def stop(stopSparkContext: Boolean): Unit = { + ssc.stop(stopSparkContext) + super.stop() + } /** * Stop the execution of the streams. @@ -519,7 +525,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = { ssc.stop(stopSparkContext, stopGracefully) + super.stop() } + + override protected def doStop() { } } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index e564eccba2df5..d8cf1f8d22ff7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.scheduler import akka.actor.{ActorRef, ActorSystem, Props, Actor} -import org.apache.spark.{SparkException, SparkEnv, Logging} +import org.apache.spark.{Lifecycle, SparkException, SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} import scala.util.{Failure, Success, Try} @@ -35,7 +35,7 @@ private[scheduler] case class ClearCheckpointData(time: Time) extends JobGenerat * up DStream metadata. */ private[streaming] -class JobGenerator(jobScheduler: JobScheduler) extends Logging { +class JobGenerator(jobScheduler: JobScheduler) extends Logging with Lifecycle { private val ssc = jobScheduler.ssc private val graph = ssc.graph @@ -66,8 +66,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // last batch whose completion,checkpointing and metadata cleanup has been completed private var lastProcessedBatch: Time = null + def conf = ssc.conf + /** Start generation of jobs */ - def start(): Unit = synchronized { + override protected def doStart(): Unit = { if (eventActor != null) return // generator has already been started eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { @@ -87,6 +89,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { * of current ongoing time interval has been generated, processed and corresponding * checkpoints written. */ + override def stop() { + stop(true) + } + + override def doStop() { } + def stop(processReceivedData: Boolean): Unit = synchronized { if (eventActor == null) return // generator has already been stopped @@ -135,6 +143,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // Stop the actor and checkpoint writer if (shouldCheckpoint) checkpointWriter.stop() ssc.env.actorSystem.stop(eventActor) + super.stop() logInfo("Stopped JobGenerator") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index d9ada99b472ac..9511da52a1107 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -21,7 +21,7 @@ import scala.util.{Failure, Success, Try} import scala.collection.JavaConversions._ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} import akka.actor.{ActorRef, Actor, Props} -import org.apache.spark.{SparkException, Logging, SparkEnv} +import org.apache.spark.{Lifecycle, SparkException, Logging, SparkEnv} import org.apache.spark.streaming._ @@ -35,22 +35,23 @@ private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends J * the jobs and runs them using a thread pool. */ private[streaming] -class JobScheduler(val ssc: StreamingContext) extends Logging { +class JobScheduler(val ssc: StreamingContext) extends Logging with Lifecycle { private val jobSets = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock - val listenerBus = new StreamingListenerBus() + val listenerBus = new StreamingListenerBus(ssc.conf) // These two are created only when scheduler starts. // eventActor not being null means the scheduler has been started and not stopped var networkInputTracker: NetworkInputTracker = null private var eventActor: ActorRef = null + def conf = ssc.conf - def start(): Unit = synchronized { + override protected def doStart(): Unit = { if (eventActor != null) return // scheduler has already been started logDebug("Starting JobScheduler") @@ -67,6 +68,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { logInfo("Started JobScheduler") } + override protected def doStop() { } + + override def stop() { + stop(true) + } + def stop(processAllReceivedData: Boolean): Unit = synchronized { if (eventActor == null) return // scheduler has already been stopped logDebug("Stopping JobScheduler") @@ -97,6 +104,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { listenerBus.stop() ssc.env.actorSystem.stop(eventActor) eventActor = null + super.stop() logInfo("Stopped JobScheduler") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index a1e6f5176825a..bafefea1ad68c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue} import akka.actor._ -import org.apache.spark.{Logging, SparkEnv, SparkException} +import org.apache.spark.{Lifecycle, Logging, SparkEnv, SparkException} import org.apache.spark.SparkContext._ import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.{StreamingContext, Time} @@ -63,7 +63,7 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) * has been called because it needs the final set of input streams at the time of instantiation. */ private[streaming] -class NetworkInputTracker(ssc: StreamingContext) extends Logging { +class NetworkInputTracker(ssc: StreamingContext) extends Logging with Lifecycle { val networkInputStreams = ssc.graph.getNetworkInputStreams() val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) @@ -79,8 +79,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { var actor: ActorRef = null var currentTime: Time = null + override def conf = ssc.conf + /** Start the actor and receiver execution thread. */ - def start() = synchronized { + override protected def doStart() = { if (actor != null) { throw new SparkException("NetworkInputTracker already started") } @@ -94,7 +96,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } /** Stop the receiver execution thread. */ - def stop() = synchronized { + override protected def doStop() = { if (!networkInputStreams.isEmpty && actor != null) { // First, stop the receivers receiverExecutor.stop() @@ -168,7 +170,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } /** This thread class runs all the receivers on the cluster. */ - class ReceiverExecutor { + class ReceiverExecutor extends Lifecycle { @transient val env = ssc.env @transient val thread = new Thread() { override def run() { @@ -181,11 +183,13 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } } - def start() { + override def conf = ssc.conf + + override protected def doStart() { thread.start() } - def stop() { + override protected def doStop() { // Send the stop signal to all the receivers stopReceivers() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index ea03dfc7bfeea..ab266c48d07d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.scheduler -import org.apache.spark.Logging +import org.apache.spark.{Lifecycle, SparkConf, Logging} import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import java.util.concurrent.LinkedBlockingQueue /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ -private[spark] class StreamingListenerBus() extends Logging { +private[spark] class StreamingListenerBus(val conf: SparkConf) extends Logging with Lifecycle { private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener] @@ -55,7 +55,7 @@ private[spark] class StreamingListenerBus() extends Logging { } } - def start() { + override def doStart() { listenerThread.start() } @@ -91,5 +91,5 @@ private[spark] class StreamingListenerBus() extends Logging { true } - def stop(): Unit = post(StreamingListenerShutdown) + override protected def doStop(): Unit = post(StreamingListenerShutdown) } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 161918859e7c4..b3847583d814e 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -42,8 +42,8 @@ private[spark] class YarnClientSchedulerBackend( } } - override def start() { - super.start() + override protected def doStart() { + super.doStart() val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") @@ -110,8 +110,8 @@ private[spark] class YarnClientSchedulerBackend( } } - override def stop() { - super.stop() + override protected def doStop() { + super.doStop() client.stop() logInfo("Stoped") } From 384f37e4f689a50737727c9fdc121bb2481a30a1 Mon Sep 17 00:00:00 2001 From: witgo Date: Sun, 13 Apr 2014 03:22:35 +0800 Subject: [PATCH 06/15] fix the test errors --- .../spark/scheduler/TaskSchedulerImplSuite.scala | 7 ++++--- .../apache/spark/streaming/StreamingContext.scala | 7 +++++-- .../streaming/api/java/JavaStreamingContext.scala | 12 ++++++------ .../spark/streaming/StreamingContextSuite.scala | 12 ++++++------ 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 2fb750d9ee378..f4bf5b5ef9893 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -23,9 +23,10 @@ import org.scalatest.FunSuite import org.apache.spark._ -class FakeSchedulerBackend extends SchedulerBackend { - def start() {} - def stop() {} +class FakeSchedulerBackend extends SchedulerBackend with Lifecycle { + def conf: SparkConf = null + def doStart() {} + def doStop() {} def reviveOffers() {} def defaultParallelism() = 1 } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 23419ff647af6..ac9e7549aeebd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -428,13 +428,16 @@ class StreamingContext private[streaming] ( waiter.waitForStopOrError(timeout) } + override def stop() { + stop(true) + } /** * Stop the execution of the streams immediately (does not wait for all received data * to be processed). * @param stopSparkContext Stop the associated SparkContext or not * */ - def stop(stopSparkContext: Boolean = true): Unit = synchronized { + def stop(stopSparkContext: Boolean): Unit = synchronized { stop(stopSparkContext, false) } @@ -448,7 +451,7 @@ class StreamingContext private[streaming] ( def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { // Warn (but not fail) if context is stopped twice, // or context is stopped before starting - if (initialized) { + if (uninitialized || initialized) { logWarning("StreamingContext has not been started yet") return } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 0c1e81adfed42..ec35a4a31a8d8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -504,8 +504,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Lifecycle { * Stop the execution of the streams. Will stop the associated JavaSparkContext as well. */ override def stop(): Unit = { - ssc.stop() - super.stop() + stop(true) } /** @@ -513,8 +512,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Lifecycle { * @param stopSparkContext Stop the associated SparkContext or not */ def stop(stopSparkContext: Boolean): Unit = { - ssc.stop(stopSparkContext) - super.stop() + stop(stopSparkContext,false) } /** @@ -524,8 +522,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Lifecycle { * received data to be completed */ def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = { - ssc.stop(stopSparkContext, stopGracefully) - super.stop() + if (ssc.started) { + ssc.stop(stopSparkContext, stopGracefully) + super.stop() + } } override protected def doStop() { } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index efd0d22ecb57a..d3f0641190ae3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import java.util.concurrent.atomic.AtomicInteger -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} +import org.apache.spark._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver} import org.apache.spark.util.{MetadataCleaner, Utils} @@ -126,18 +126,18 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register - assert(ssc.state === ssc.StreamingContextState.Initialized) + assert(ssc.state === Service.State.Uninitialized) ssc.start() - assert(ssc.state === ssc.StreamingContextState.Started) + assert(ssc.state === Service.State.Started) ssc.stop() - assert(ssc.state === ssc.StreamingContextState.Stopped) + assert(ssc.state === Service.State.Stopped) } test("start multiple times") { ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register ssc.start() - intercept[SparkException] { + intercept[IllegalArgumentException] { ssc.start() } } @@ -156,7 +156,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() // stop before start should not throw exception ssc.start() ssc.stop() - intercept[SparkException] { + intercept[IllegalStateException] { ssc.start() // start after stop should throw exception } } From e8edf36e977a902006d391d6ea0401291bc0705e Mon Sep 17 00:00:00 2001 From: witgo Date: Sun, 13 Apr 2014 13:01:46 +0800 Subject: [PATCH 07/15] Lifecycle throw SparkException --- core/src/main/scala/org/apache/spark/Lifecycle.scala | 12 +++++++++--- .../spark/streaming/StreamingContextSuite.scala | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Lifecycle.scala b/core/src/main/scala/org/apache/spark/Lifecycle.scala index bcb7dafb6063b..9c1f25277d85d 100644 --- a/core/src/main/scala/org/apache/spark/Lifecycle.scala +++ b/core/src/main/scala/org/apache/spark/Lifecycle.scala @@ -36,20 +36,26 @@ trait Lifecycle extends Service { def state: Service.State = state_ def initialize(): Unit = synchronized { - require(uninitialized, s"Can't move to initialized state when $state_") + if (!uninitialized) { + throw new SparkException(s"Can't move to started state when $state_") + } doInitialize state_ = Initialized } override def start(): Unit = synchronized { if (uninitialized) initialize() - require(initialized || stopped, s"Can't move to started state when $state_") + if (started) { + throw new SparkException(s"Can't move to started state when $state_") + } doStart() state_ = Started } override def stop(): Unit = synchronized { - require(started, s"Can't move to stopped state when $state_") + if (!started) { + throw new SparkException(s"Can't move to started state when $state_") + } doStop state_ = Stopped } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index d3f0641190ae3..7cb967ebdd941 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -156,7 +156,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() // stop before start should not throw exception ssc.start() ssc.stop() - intercept[IllegalStateException] { + intercept[SparkException] { ssc.start() // start after stop should throw exception } } From 8342ee2148c20eea1e6f49f00cb2da185e7d2685 Mon Sep 17 00:00:00 2001 From: witgo Date: Sun, 13 Apr 2014 15:00:54 +0800 Subject: [PATCH 08/15] fix test errors --- .../scala/org/apache/spark/SparkContext.scala | 29 ++++++++++++++----- .../spark/streaming/StreamingContext.scala | 9 ++++++ .../streaming/StreamingContextSuite.scala | 4 +-- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5a36e6f5c19a9..0c8ff2af42b6e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -58,7 +58,7 @@ import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerTy */ @DeveloperApi -class SparkContext(config: SparkConf) extends Logging { +class SparkContext(config: SparkConf) extends Logging with Lifecycle { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -73,10 +73,10 @@ class SparkContext(config: SparkConf) extends Logging { * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ - @DeveloperApi - def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { - this(config) - this.preferredNodeLocationData = preferredNodeLocationData + @DeveloperApi + def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { + this(config) + this.preferredNodeLocationData = preferredNodeLocationData } /** @@ -146,7 +146,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) = this(master, appName, sparkHome, jars, Map(), Map()) - private[spark] val conf = config.clone() + val conf = config.clone() /** * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be @@ -928,8 +928,23 @@ class SparkContext(config: SparkConf) extends Logging { addedJars.clear() } + override def start() { + if (started) { + throw new SparkException("SparkContext has already been stopped") + } + super.start() + } + + override protected def doStart() {} + + start() + /** Shut down the SparkContext. */ - def stop() { + override def stop() { + if (started) super.stop() + } + + override protected def doStop() { postApplicationEnd() ui.stop() // Do this only if not stopped already - best case effort. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ac9e7549aeebd..d0d1220f5f240 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -406,6 +406,15 @@ class StreamingContext private[streaming] ( /** * Start the execution of the streams. */ + override def start() { + // Throw exception if the context has already been started once + // or if a stopped context is being started again + if (started) { + throw new SparkException("StreamingContext has already been stopped") + } + super.start() + } + override protected def doStart() { validate() scheduler.start() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 7cb967ebdd941..a2331f712b526 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -137,7 +137,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(master, appName, batchDuration) addInputStream(ssc).register ssc.start() - intercept[IllegalArgumentException] { + intercept[SparkException] { ssc.start() } } @@ -156,7 +156,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() // stop before start should not throw exception ssc.start() ssc.stop() - intercept[SparkException] { + intercept[IllegalArgumentException] { ssc.start() // start after stop should throw exception } } From 8c5c969c69498c74e415499df1a46ac37638fd9e Mon Sep 17 00:00:00 2001 From: witgo Date: Sun, 13 Apr 2014 17:57:49 +0800 Subject: [PATCH 09/15] fix StreamingContext start state check --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 2 +- .../org/apache/spark/streaming/StreamingContextSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index d0d1220f5f240..939c8fc1b3fe9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -409,7 +409,7 @@ class StreamingContext private[streaming] ( override def start() { // Throw exception if the context has already been started once // or if a stopped context is being started again - if (started) { + if (stopped) { throw new SparkException("StreamingContext has already been stopped") } super.start() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a2331f712b526..59e2a5f9f3e31 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -156,7 +156,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc.stop() // stop before start should not throw exception ssc.start() ssc.stop() - intercept[IllegalArgumentException] { + intercept[SparkException] { ssc.start() // start after stop should throw exception } } From e2aed59e794808633e23a98dad0b8703feeaf774 Mon Sep 17 00:00:00 2001 From: witgo Date: Sun, 13 Apr 2014 18:05:04 +0800 Subject: [PATCH 10/15] fix SparkContext start state check --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0c8ff2af42b6e..56107ad3cd327 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -929,7 +929,7 @@ class SparkContext(config: SparkConf) extends Logging with Lifecycle { } override def start() { - if (started) { + if (stopped) { throw new SparkException("SparkContext has already been stopped") } super.start() From 7bb00c828e3ec994e7866bd40f68982fad819a2d Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 24 Apr 2014 00:42:46 +0800 Subject: [PATCH 11/15] review commit --- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 98b5b4a864c5a..f4e5aae43f425 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -190,11 +190,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging with Lifecycle { } /** This thread class runs all the receivers on the cluster. */ -<<<<<<< HEAD:streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala - class ReceiverExecutor extends Lifecycle { -======= - class ReceiverLauncher { ->>>>>>> 39f85e0322cfecefbc30e7d5a30356cfab1e9640:streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala + class ReceiverLauncher extends Lifecycle { @transient val env = ssc.env @transient val thread = new Thread() { override def run() { From 1b83abbaac9c888b4f29ba668d46bb24fe7399b0 Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 15 May 2014 18:34:39 +0800 Subject: [PATCH 12/15] fix copywriting error --- core/src/main/scala/org/apache/spark/Lifecycle.scala | 4 ++-- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Lifecycle.scala b/core/src/main/scala/org/apache/spark/Lifecycle.scala index 9c1f25277d85d..a0d5441435c4f 100644 --- a/core/src/main/scala/org/apache/spark/Lifecycle.scala +++ b/core/src/main/scala/org/apache/spark/Lifecycle.scala @@ -37,7 +37,7 @@ trait Lifecycle extends Service { def initialize(): Unit = synchronized { if (!uninitialized) { - throw new SparkException(s"Can't move to started state when $state_") + throw new SparkException(s"Can't move to Initialized state when $state_") } doInitialize state_ = Initialized @@ -54,7 +54,7 @@ trait Lifecycle extends Service { override def stop(): Unit = synchronized { if (!started) { - throw new SparkException(s"Can't move to started state when $state_") + throw new SparkException(s"Can't move to Stopped state when $state_") } doStop state_ = Stopped diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 544fd16bb63fe..69d156f4fd312 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -136,6 +136,8 @@ class DAGScheduler( initializeEventProcessActor() } + start() + // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { eventProcessActor ! BeginEvent(task, taskInfo) From c6118601a1a9ee49efa754e2efe4e0161c7390a4 Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 15 May 2014 18:46:13 +0800 Subject: [PATCH 13/15] fix copywriting error --- core/src/main/scala/org/apache/spark/Lifecycle.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Lifecycle.scala b/core/src/main/scala/org/apache/spark/Lifecycle.scala index a0d5441435c4f..83c6de17f7e4b 100644 --- a/core/src/main/scala/org/apache/spark/Lifecycle.scala +++ b/core/src/main/scala/org/apache/spark/Lifecycle.scala @@ -37,7 +37,7 @@ trait Lifecycle extends Service { def initialize(): Unit = synchronized { if (!uninitialized) { - throw new SparkException(s"Can't move to Initialized state when $state_") + throw new SparkException(s"Can't move to initialized state when $state_") } doInitialize state_ = Initialized @@ -54,7 +54,7 @@ trait Lifecycle extends Service { override def stop(): Unit = synchronized { if (!started) { - throw new SparkException(s"Can't move to Stopped state when $state_") + throw new SparkException(s"Can't move to stopped state when $state_") } doStop state_ = Stopped From 0ec81d9f4dd9da7d5d36ecc00aa7d78e7e397139 Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 16 May 2014 00:33:15 +0800 Subject: [PATCH 14/15] fix EOFException --- .../main/scala/org/apache/spark/Lifecycle.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Lifecycle.scala b/core/src/main/scala/org/apache/spark/Lifecycle.scala index 83c6de17f7e4b..b53d23fd51da2 100644 --- a/core/src/main/scala/org/apache/spark/Lifecycle.scala +++ b/core/src/main/scala/org/apache/spark/Lifecycle.scala @@ -21,7 +21,7 @@ trait Lifecycle extends Service { import Service.State._ - private var state_ = Uninitialized + protected var state_ = Uninitialized def conf: SparkConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 90d67a68838f4..933f6e0571518 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -32,12 +32,13 @@ private[spark] class SparkDeploySchedulerBackend( with Logging { var client: AppClient = null + var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = conf.getOption("spark.cores.max").map(_.toInt) - override protected def doStart() { - super.doStart() + override def start() { + super.start() // The endpoint for executors to talk to us val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( @@ -63,8 +64,9 @@ private[spark] class SparkDeploySchedulerBackend( client.start() } - override protected def doStop() { - super.doStop() + override def stop() { + stopping = true + super.stop() client.stop() if (shutdownCallback != null) { shutdownCallback(this) @@ -76,13 +78,13 @@ private[spark] class SparkDeploySchedulerBackend( } override def disconnected() { - if (!stopped) { + if (!stopping) { logWarning("Disconnected from Spark cluster! Waiting for reconnection...") } } override def dead(reason: String) { - if (!stopped) { + if (!stopping) { logError("Application has been killed. Reason: " + reason) scheduler.error(reason) } From 2ccf43ce351e219fc64cad9c18248befbb41664b Mon Sep 17 00:00:00 2001 From: witgo Date: Fri, 16 May 2014 00:51:15 +0800 Subject: [PATCH 15/15] fix EOFException --- .../spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 933f6e0571518..59a90766c8614 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -39,6 +39,7 @@ private[spark] class SparkDeploySchedulerBackend( override def start() { super.start() + stopping = false // The endpoint for executors to talk to us val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(