Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12330] [MESOS] Fix mesos coarse mode cleanup #10319

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.executor

import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.util.{Failure, Success}
Expand All @@ -42,6 +43,7 @@ private[spark] class CoarseGrainedExecutorBackend(
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

Expand Down Expand Up @@ -102,19 +104,23 @@ private[spark] class CoarseGrainedExecutorBackend(
}

case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)

case Shutdown =>
stopping.set(true)
executor.stop()
stop()
rpcEnv.shutdown()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (driver.exists(_.address == remoteAddress)) {
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
System.exit(1)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, List => JList}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}

import com.google.common.base.Stopwatch
import com.google.common.collect.HashBiMap
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
Expand Down Expand Up @@ -60,6 +62,12 @@ private[spark] class CoarseMesosSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdown.ms", "10s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please submit a new PR to add this to the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, nvm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way I just realized this config is not properly named. Unfortunately I did not catch this during code reviews and I just pushed a hot fix in master to correct this: c756bda

.ensuring(_ >= 0, "spark.mesos.coarse.shutdown.ms must be >= 0")

// Synchronization protected by stateLock
private[this] var stopCalled: Boolean = false

// If shuffle service is enabled, the Spark driver will register with the shuffle service.
// This is for cleaning up shuffle files reliably.
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
Expand Down Expand Up @@ -245,6 +253,13 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
stateLock.synchronized {
if (stopCalled) {
logDebug("Ignoring offers during shutdown")
// Driver should simply return a stopped status on race
// condition between this.stop() and completing here
offers.asScala.map(_.getId).foreach(d.declineOffer)
return
}
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
for (offer <- offers.asScala) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
Expand Down Expand Up @@ -364,7 +379,29 @@ private[spark] class CoarseMesosSchedulerBackend(
}

override def stop() {
super.stop()
// Make sure we're not launching tasks during shutdown
stateLock.synchronized {
if (stopCalled) {
logWarning("Stop called multiple times, ignoring")
return
}
stopCalled = true
super.stop()
}
// Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them.
// See SPARK-12330
val stopwatch = new Stopwatch()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Stopwatch constructor was deprecated in newer versions of Guava (google/guava@fd0cbc2). In order to work around this issue, I'd like to remove this use of Stopwatch since we don't use it anywhere else and it doesn't seem to be buying us a whole lot in the way that it's used here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Sorry, didn't see this note until now, and it looks like this was already fixed in master.

stopwatch.start()
// slaveIdsWithExecutors has no memory barrier, so this is eventually consistent
while (slaveIdsWithExecutors.nonEmpty &&
stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) {
Thread.sleep(100)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you used to have a check in the end, that if slaveIdsWithExecutors is non empty we print a warning right? I think that's still valuable to print it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will make sure that's in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added warning if still nonEmpty

if (slaveIdsWithExecutors.nonEmpty) {
logWarning(s"Timed out waiting for ${slaveIdsWithExecutors.size} remaining executors "
+ s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files "
+ "on the mesos nodes.")
}
if (mesosDriver != null) {
mesosDriver.stop()
}
Expand Down