Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/master' into fix-second-…
Browse files Browse the repository at this point in the history
…minute-truc
  • Loading branch information
MaxGekk committed Feb 17, 2020
2 parents b7ca731 + 29b3e42 commit 76ca700
Show file tree
Hide file tree
Showing 126 changed files with 1,916 additions and 987 deletions.
2 changes: 2 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE
Expand Up @@ -6,6 +6,8 @@ Thanks for sending a pull request! Here are some tips for you:
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a faster review.
7. If you want to add a new configuration, please read the guideline first for naming configurations in
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->

### What changes were proposed in this pull request?
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Expand Up @@ -60,6 +60,15 @@ private[deploy] object DeployMessages {
assert (port > 0)
}

/**
* @param id the worker id
* @param worker the worker endpoint ref
*/
case class WorkerDecommission(
id: String,
worker: RpcEndpointRef)
extends DeployMessage

case class ExecutorStateChanged(
appId: String,
execId: Int,
Expand Down Expand Up @@ -149,6 +158,8 @@ private[deploy] object DeployMessages {

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
Expand Down
Expand Up @@ -19,9 +19,13 @@ package org.apache.spark.deploy

private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED, DECOMMISSIONED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
// DECOMMISSIONED isn't listed as finished since we don't want to remove the executor from
// the worker and the executor still exists - but we do want to avoid scheduling new tasks on it.
private val finishedStates = Seq(KILLED, FAILED, LOST, EXITED)

def isFinished(state: ExecutorState): Boolean = finishedStates.contains(state)
}
Expand Up @@ -180,6 +180,8 @@ private[spark] class StandaloneAppClient(
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId, message.getOrElse(""))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Expand Up @@ -39,5 +39,7 @@ private[spark] trait StandaloneAppClientListener {
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit

def executorDecommissioned(fullId: String, message: String): Unit

def workerRemoved(workerId: String, host: String, message: String): Unit
}
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Expand Up @@ -243,6 +243,15 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)

case WorkerDecommission(id, workerRef) =>
logInfo("Recording worker %s decommissioning".format(id))
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else {
// We use foreach since get gives us an option and we can skip the failures.
idToWorker.get(id).foreach(decommissionWorker)
}

case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
masterAddress, resources) =>
Expand Down Expand Up @@ -313,7 +322,9 @@ private[deploy] class Master(
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
// We also don't count failures from decommissioned workers since they are "expected."
if (!normalExit
&& oldState != ExecutorState.DECOMMISSIONED
&& appInfo.incrementRetryCount() >= maxExecutorRetries
&& maxExecutorRetries >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
Expand Down Expand Up @@ -850,6 +861,26 @@ private[deploy] class Master(
true
}

private def decommissionWorker(worker: WorkerInfo): Unit = {
if (worker.state != WorkerState.DECOMMISSIONED) {
logInfo("Decommissioning worker %s on %s:%d".format(worker.id, worker.host, worker.port))
worker.setState(WorkerState.DECOMMISSIONED)
for (exec <- worker.executors.values) {
logInfo("Telling app of decommission executors")
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.DECOMMISSIONED,
Some("worker decommissioned"), None, workerLost = false))
exec.state = ExecutorState.DECOMMISSIONED
exec.application.removeExecutor(exec)
}
// On recovery do not add a decommissioned executor
persistenceEngine.removeWorker(worker)
} else {
logWarning("Skipping decommissioning worker %s on %s:%d as worker is already decommissioned".
format(worker.id, worker.host, worker.port))
}
}

private def removeWorker(worker: WorkerInfo, msg: String): Unit = {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
worker.setState(WorkerState.DEAD)
Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Expand Up @@ -67,6 +67,14 @@ private[deploy] class Worker(
Utils.checkHost(host)
assert (port > 0)

// If worker decommissioning is enabled register a handler on PWR to shutdown.
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logInfo("Registering SIGPWR handler to trigger decommissioning.")
SignalUtils.register("PWR")(decommissionSelf)
} else {
logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
}

// A scheduled executor used to send messages at the specified time.
private val forwardMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
Expand Down Expand Up @@ -128,6 +136,7 @@ private[deploy] class Worker(
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
private var decommissioned = false
private val workerId = generateWorkerId()
private val sparkHome =
if (sys.props.contains(IS_TESTING.key)) {
Expand Down Expand Up @@ -549,6 +558,8 @@ private[deploy] class Worker(
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, resources_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else if (decommissioned) {
logWarning("Asked to launch an executor while decommissioned. Not launching executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
Expand Down Expand Up @@ -672,6 +683,9 @@ private[deploy] class Worker(
case ApplicationFinished(id) =>
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
decommissionSelf()
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -771,6 +785,18 @@ private[deploy] class Worker(
}
}

private[deploy] def decommissionSelf(): Boolean = {
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
logDebug("Decommissioning self")
decommissioned = true
sendToMaster(WorkerDecommission(workerId, self))
} else {
logWarning("Asked to decommission self, but decommissioning not enabled")
}
// Return true since can be called as a signal handler
true
}

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
Expand Down
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, ThreadUtils, Utils}
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand All @@ -64,6 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None

// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
Expand All @@ -80,6 +81,9 @@ private[spark] class CoarseGrainedExecutorBackend(
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]

override def onStart(): Unit = {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR")(decommissionSelf)

logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
Expand Down Expand Up @@ -160,6 +164,16 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
if (decommissioned) {
logError("Asked to launch a task while decommissioned.")
driver match {
case Some(endpoint) =>
logInfo("Sending DecommissionExecutor to driver.")
endpoint.send(DecommissionExecutor(executorId))
case _ =>
logError("No registered driver to send Decommission to.")
}
}
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
Expand Down Expand Up @@ -242,6 +256,29 @@ private[spark] class CoarseGrainedExecutorBackend(

System.exit(code)
}

private def decommissionSelf(): Boolean = {
logInfo("Decommissioning self w/sync")
try {
decommissioned = true
// Tell master we are are decommissioned so it stops trying to schedule us
if (driver.nonEmpty) {
driver.get.askSync[Boolean](DecommissionExecutor(executorId))
} else {
logError("No driver to message decommissioning.")
}
if (executor != null) {
executor.decommission()
}
logInfo("Done decommissioning self.")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
false
}
}
}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Expand Up @@ -216,16 +216,32 @@ private[spark] class Executor(
*/
private var heartbeatFailures = 0

/**
* Flag to prevent launching new tasks while decommissioned. There could be a race condition
* accessing this, but decommissioning is only intended to help not be a hard stop.
*/
private var decommissioned = false

heartbeater.start()

metricsPoller.start()

private[executor] def numRunningTasks: Int = runningTasks.size()

/**
* Mark an executor for decommissioning and avoid launching new tasks.
*/
private[spark] def decommission(): Unit = {
decommissioned = true
}

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}

def killTask(taskId: Long, interruptThread: Boolean, reason: String): Unit = {
Expand Down
Expand Up @@ -17,6 +17,35 @@

package org.apache.spark.internal.config

// ====================================================================================
// The guideline for naming configurations
// ====================================================================================
/*
In general, the config name should be a noun that describes its basic purpose. It's
recommended to add prefix to the config name to make the scope clearer. For example,
`spark.scheduler.mode` clearly indicates that this config is for the scheduler.
A config name can have multiple prefixes that are structured, which is similar to a
qualified Java class name. Each prefix behaves like a namespace. We should only create
a namespace if it's meaningful and can be shared by multiple configs. For example,
`buffer.inMemoryThreshold` is preferred over `buffer.in.memory.threshold`.
The followings are best practices of naming configs for some common cases:
1. When adding configs for a big feature, it's better to create an umbrella config that
can turn the feature on/off, with a name like `featureName.enabled`. The other configs
of this feature should be put under the `featureName` namespace. For example:
- spark.sql.cbo.enabled
- spark.sql.cbo.starSchemaDetection
- spark.sql.cbo.joinReorder.enabled
- spark.sql.cbo.joinReorder.dp.threshold
2. When adding a boolean config, the name should be a verb that describes what
happens if this config is set to true, e.g. `spark.shuffle.sort.useRadixSort`.
3. When adding a config to specify a time duration, it's better to put the time unit
in the config name. For example, `featureName.timeoutMs`, which clearly indicates
that the time unit is millisecond. The config entry should be created with
`ConfigBuilder#timeConf`, to support time strings like `2 minutes`.
*/

/**
* An entry contains all meta information for a configuration.
*
Expand Down
Expand Up @@ -71,4 +71,9 @@ private[spark] object Worker {
ConfigBuilder("spark.worker.ui.compressedLogFileLengthCacheSize")
.intConf
.createWithDefault(100)

private[spark] val WORKER_DECOMMISSION_ENABLED =
ConfigBuilder("spark.worker.decommission.enabled")
.booleanConf
.createWithDefault(false)
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -361,6 +361,7 @@ abstract class RDD[T: ClassTag](
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
// Block hit.
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
Expand All @@ -374,6 +375,7 @@ abstract class RDD[T: ClassTag](
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
// Need to compute the block.
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
Expand Down
Expand Up @@ -58,3 +58,11 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los
private[spark]
case class SlaveLost(_message: String = "Slave lost", workerLost: Boolean = false)
extends ExecutorLossReason(_message)

/**
* A loss reason that means the executor is marked for decommissioning.
*
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*/
private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.")
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Expand Up @@ -88,6 +88,10 @@ private[spark] class Pool(
schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
}

override def executorDecommission(executorId: String): Unit = {
schedulableQueue.asScala.foreach(_.executorDecommission(executorId))
}

override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue.asScala) {
Expand Down
Expand Up @@ -43,6 +43,7 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
def executorDecommission(executorId: String): Unit
def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
Expand Up @@ -27,6 +27,9 @@ private[spark] trait SchedulerBackend {

def start(): Unit
def stop(): Unit
/**
* Update the current offers and schedule tasks
*/
def reviveOffers(): Unit
def defaultParallelism(): Int

Expand Down

0 comments on commit 76ca700

Please sign in to comment.