Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark

import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.ExecutorDecommissionReason
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we renaming this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I would argue against un-necessary renaming even if it seems a bit "unnatural". It creates un-necessary diff noise.

To me "Info" and "Reason" are both similar: They both portend "additional information".

Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess @Ngone51 is trying to follow the style of TaskEndReason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @dongjoon-hyun for the clarification. In addition to follow the style of TaskEndReason, I acutally also want to handle the decommission info/reason in the similar way of TaskEndReason.


/**
* A client that communicates with the cluster manager to request or kill executors.
Expand Down Expand Up @@ -88,44 +88,35 @@ private[spark] trait ExecutorAllocationClient {
* Default implementation delegates to kill, scheduler must override
* if it supports graceful decommissioning.
*
* @param executorsAndDecomInfo identifiers of executors & decom info.
* @param executorsAndDecomReason identifiers of executors & decom reason.
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been decommissioned.
* @param triggeredByExecutor whether the decommission is triggered at executor.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean): Seq[String] = {
killExecutors(executorsAndDecomInfo.map(_._1),
executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that different executors have different ExecutorDecommissionReason? If it's not possible, I think we are over-engineering here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it was earlier -- so we aren't changing the semantics save the renaming :-) And plus yes this can happen: Different executors on different hosts would have different ExecutorDecommissionReason/Info with different hosts potentially in them.

This is simply a bulk api : Instead of making n calls we are folding them into one.

adjustTargetNumExecutors: Boolean): Seq[String] = {
killExecutors(executorsAndDecomReason.map(_._1),
adjustTargetNumExecutors,
countFailures = false)
}


/**
* Request that the cluster manager decommission the specified executor.
* Delegates to decommissionExecutors.
*
* @param executorId identifiers of executor to decommission
* @param decommissionInfo information about the decommission (reason, host loss)
* @param decomReason the decommission reason of the executor
* @param adjustTargetNumExecutors if we should adjust the target number of executors.
* @param triggeredByExecutor whether the decommission is triggered at executor.
* (TODO: add a new type like `ExecutorDecommissionInfo` for the
* case where executor is decommissioned at executor first, so we
* don't need this extra parameter.)
* @return whether the request is acknowledged by the cluster manager.
*/
final def decommissionExecutor(
executorId: String,
decommissionInfo: ExecutorDecommissionInfo,
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean = false): Boolean = {
decomReason: ExecutorDecommissionReason,
adjustTargetNumExecutors: Boolean): Boolean = {
val decommissionedExecutors = decommissionExecutors(
Array((executorId, decommissionInfo)),
adjustTargetNumExecutors = adjustTargetNumExecutors,
triggeredByExecutor = triggeredByExecutor)
Array((executorId, decomReason)),
adjustTargetNumExecutors = adjustTargetNumExecutors)
decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,11 @@ private[spark] class ExecutorAllocationManager(
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
if (decommissionEnabled) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(id =>
(id, DynamicAllocationDecommission())).toArray[(String, ExecutorDecommissionReason)]
client.decommissionExecutors(
executorIdsWithoutHostLoss,
adjustTargetNumExecutors = false,
triggeredByExecutor = false)
adjustTargetNumExecutors = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.StandaloneDecommission
import org.apache.spark.util.{RpcUtils, ThreadUtils}

/**
Expand Down Expand Up @@ -182,8 +182,7 @@ private[spark] class StandaloneAppClient(
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost)
} else if (state == ExecutorState.DECOMMISSIONED) {
listener.executorDecommissioned(fullId,
ExecutorDecommissionInfo(message.getOrElse(""), workerHost))
listener.executorDecommissioned(fullId, StandaloneDecommission(workerHost))
}

case WorkerRemoved(id, host, message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.client

import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.ExecutorDecommissionReason

/**
* Callbacks invoked by deploy client when various events happen. There are currently five events:
Expand All @@ -41,7 +41,7 @@ private[spark] trait StandaloneAppClientListener {
def executorRemoved(
fullId: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit

def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
def executorDecommissioned(fullId: String, decomReason: ExecutorDecommissionReason): Unit

def workerRemoved(workerId: String, host: String, message: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler(
val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled
val isHostDecommissioned = taskScheduler
.getExecutorDecommissionState(bmAddress.executorId)
.exists(_.workerHost.isDefined)
.exists(_.isHostDecommissioned)

// Shuffle output of all executors on host `bmAddress.host` may be lost if:
// - External shuffle service is enabled, so we assume that all shuffle data on node is
Expand Down Expand Up @@ -2368,7 +2368,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case ExecutorLost(execId, reason) =>
val workerHost = reason match {
case ExecutorProcessLost(_, workerHost, _) => workerHost
case ExecutorDecommission(workerHost) => workerHost
case ExecutorDecommission(_, host) => host

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below for ExecutorDecommission ... Should this be changed to a:

case decom @ ExecutorDecommission => decom.workerHost // or decom.host

You don't need to add an extra '_' then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this actually a good point! This's actually a rule of Databricks' scala style guide. But I just follow the style of above ExecutorProcessLost here. I think it's acceptable when there're not many arugumenetes are being expaned.

case _ => None
}
dagScheduler.handleExecutorLost(execId, workerHost)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

private[spark] sealed trait ExecutorDecommissionReason {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get why this is a sealed trait, namely we're pattern matching against it. But this seems to remove flexibility for anyone working on scheduler backends

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk, can you please provide an example of how having this as a sealed trait would limit the flexibility ?

It is marked as a private[spark], so the resource manager specific scheduler backends, should be able to extend it ... no ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duh. Sorry for my n00bness. I can totally see why this shouldn't be a sealed trait: For example it is forcing the TestExecutorDecommissionInfo to be in this file.

@Ngone51 is there a strong reason for making this be a sealed trait ? Is that required by the RPC framework for example ? If not, I don't think its worth it.

val reason: String = "decommissioned"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the reason field is really needed anywhere, besides it being used for toString ? Should we just require overriding toString by marking toString abstract ? I don't think that child classes need to override both toString and reason : I would prefer we just override methods instead of fields.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also indirectly use the reason for logExecutorLoss. And yeah we can just override toString indeed.

override def toString: String = reason
}

/**
* For the case where decommission is trigger because of executor dynamic allocation
*/
case class DynamicAllocationDecommission() extends ExecutorDecommissionReason {
override val reason: String = "decommissioned by dynamic allocation"
}

/**
* For the case where decommission is triggered at executor fist.
*/
class ExecutorTriggeredDecommission extends ExecutorDecommissionReason

/**
* For the Kubernetes workloads
*/
case class K8SDecommission() extends ExecutorTriggeredDecommission
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe we could have a better level here, there isn't really anything K8s specific about this kind of message. Rather all external cluster manager decommissions could be the same perhaps?


/**
* For the Standalone workloads.
* @param workerHost When workerHost is defined, it means the worker has been decommissioned too.
* Used to infer if the shuffle data might be lost even if the external shuffle
* service is enabled.
*/
case class StandaloneDecommission(workerHost: Option[String] = None)
extends ExecutorDecommissionReason {
override val reason: String = if (workerHost.isDefined) {
s"Worker ${workerHost.get} decommissioned"
} else {
"decommissioned"
}
}

/**

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this test only class somewhere in the test only package ?

See TestResourceIDs as an example.

* For test only.
*/
case class TestExecutorDecommission(host: Option[String] = None)
extends ExecutorDecommissionReason {
override val reason: String = if (host.isDefined) {
s"Host ${host.get} decommissioned(test)"
} else {
"decommissioned(test)"
}
}

/**
* State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived
* from the ExecutorDecommissionReason above but it is kept distinct to allow the state to evolve
* independently from the message.
*/
case class ExecutorDecommissionState(
// Timestamp the decommissioning commenced as per the Driver's clock,
// to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL
// is configured.
startTime: Long,
reason: ExecutorDecommissionReason) {

def isHostDecommissioned: Boolean = reason match {
case StandaloneDecommission(workerHost) => workerHost.isDefined
case _ => false
}

def host: Option[String] = reason match {
case StandaloneDecommission(workerHost) => workerHost
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ case class ExecutorProcessLost(
* This is used by the task scheduler to remove state associated with the executor, but
* not yet fail any tasks that were running in the executor before the executor is "fully" lost.
*
* @param workerHost it is defined when the worker is decommissioned too
* @param reason the reason why the executor is decommissioned
* @param host it is defined when the host where the executor located is decommissioned too
*/
private [spark] case class ExecutorDecommission(workerHost: Option[String] = None)
extends ExecutorLossReason("Executor decommission.")
private [spark] case class ExecutorDecommission(reason: String, host: Option[String] = None)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My scala knowledge is really really poor, but I would rather we make this be a non case class if you are planning to do this. Currently, I think the field "reason" is going to be duplicated in the base class ExecutorLossReason and the ExecutorDecommission.

That's also the reason why you are pattern matching it above with an additional _ (for the reason) argument, when you really don't care about the reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case class is neeed because we'd apply pattern matching on it.

The "reason" is necessary because of class inheritance, no? Please see ExecutorProcessLost for as instance, the ExecutorProcessLost also has the field _message, which is needed to assigne to ExecutorProcessLost.message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can right unapply methods if you need to do pattern matching with something other than a case class.

extends ExecutorLossReason(reason)
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[spark] trait TaskScheduler {
/**
* Process a decommissioning executor.
*/
def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit
def executorDecommission(executorId: String, reason: ExecutorDecommissionReason): Unit

/**
* If an executor is decommissioned, return its corresponding decommission info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,12 +906,12 @@ private[spark] class TaskSchedulerImpl(
}

override def executorDecommission(
executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
executorId: String, reason: ExecutorDecommissionReason): Unit = {
synchronized {
// Don't bother noting decommissioning for executors that we don't know about
if (executorIdToHost.contains(executorId)) {
executorsPendingDecommission(executorId) =
ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost)
ExecutorDecommissionState(clock.getTimeMillis(), reason)
}
}
rootPool.executorDecommission(executorId)
Expand Down Expand Up @@ -970,6 +970,9 @@ private[spark] class TaskSchedulerImpl(
logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.")
case ExecutorKilled =>
logInfo(s"Executor $executorId on $hostPort killed by driver.")
case ExecutorDecommission(reason, _) =>
// use logInfo instead of logError as the loss of decommissioned executor is what we expect
logInfo(s"Decommissioned executor $executorId on $hostPort shutdown: $reason")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of 'shutdown', should we say 'is finally lost' ? To be more accurate in this setting.

+1 on this change to avoid log spam.

case _ =>
logError(s"Lost executor $executorId on $hostPort: $reason")
}
Expand Down Expand Up @@ -1055,7 +1058,7 @@ private[spark] class TaskSchedulerImpl(
// exposed for test
protected final def isHostDecommissioned(host: String): Boolean = {
hostToExecutors.get(host).exists { executors =>
executors.exists(e => getExecutorDecommissionState(e).exists(_.workerHost.isDefined))
executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
val exitCausedByApp: Boolean = reason match {
case exited: ExecutorExited => exited.exitCausedByApp
case ExecutorKilled | ExecutorDecommission(_) => false
case ExecutorKilled | ExecutorDecommission(_, _) => false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should instead pattern match in a separate arm like:

_ @ ExecutorDecommission => false

To avoid having to change the case arms when we make changes to the structure definitions.

case ExecutorProcessLost(_, _, false) => false
case _ => true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.scheduler.ExecutorLossReason
import org.apache.spark.util.SerializableBuffer

Expand Down
Loading