Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val heartbeatTimeoutReason = ExecutorProcessLost(
s"Executor heartbeat timed out after ${now - lastSeenMs} ms")
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
sc.schedulerBackend match {
case backend: CoarseGrainedSchedulerBackend =>
backend.killAndReplaceExecutor(executorId, heartbeatTimeoutReason)
case _ =>
sc.killAndReplaceExecutor(executorId)
}
// SPARK-27348: in case of the executors which are not gracefully shut down,
// we should remove lost executors from CoarseGrainedSchedulerBackend manually
// here to guarantee two things:
Expand All @@ -231,9 +238,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// those executors to avoid app hang
sc.schedulerBackend match {
case backend: CoarseGrainedSchedulerBackend =>
backend.driverEndpoint.send(RemoveExecutor(executorId,
ExecutorProcessLost(
s"Executor heartbeat timed out after ${now - lastSeenMs} ms")))
backend.driverEndpoint.send(RemoveExecutor(executorId, heartbeatTimeoutReason))

// LocalSchedulerBackend is used locally and only has one single executor
case _: LocalSchedulerBackend =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private[scheduler] val executorsPendingToRemove = new HashMap[String, Boolean]

// When a forced driver-side removal should preserve a more specific loss reason, keep it until
// the backend reports the executor removed.
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private val pendingRemovalLossReasons = new HashMap[String, ExecutorLossReason]

// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]

Expand Down Expand Up @@ -467,12 +472,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorDataMap -= executorId
executorsPendingLossReason -= executorId
val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false)
val pendingRemovalLossReason = pendingRemovalLossReasons.remove(executorId)
val decommissionInfoOpt = executorsPendingDecommission.remove(executorId)
if (killedByDriver) {
ExecutorKilled
} else if (decommissionInfoOpt.isDefined) {
val decommissionInfo = decommissionInfoOpt.get
ExecutorDecommission(decommissionInfo.workerHost, decommissionInfo.message)
} else if (reason == ExecutorKilled) {
pendingRemovalLossReason.getOrElse(reason)
} else {
reason
}
Expand Down Expand Up @@ -938,6 +946,22 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean): Seq[String] = {
killExecutors(executorIds, adjustTargetNumExecutors, countFailures, force, None)
}

private[spark] def killAndReplaceExecutor(
executorId: String,
lossReason: ExecutorLossReason): Boolean = {
killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = true,
force = true, lossReason = Some(lossReason)).nonEmpty
}

private[spark] def killExecutors(
executorIds: Seq[String],
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean,
lossReason: Option[ExecutorLossReason]): Seq[String] = {
logInfo(
log"Requesting to kill executor(s) ${MDC(LogKeys.EXECUTOR_IDS, executorIds.mkString(", "))}")

Expand All @@ -952,7 +976,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures }
executorsToKill.foreach { id =>
executorsPendingToRemove(id) = !countFailures
lossReason.foreach { reason =>
pendingRemovalLossReasons(id) = reason
}
}

logInfo(log"Actual list of executor(s) to be killed is " +
log"${MDC(LogKeys.EXECUTOR_IDS, executorsToKill.mkString(", "))}")
Expand All @@ -975,8 +1004,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
val cleanedKillResponse = killResponse.transform { result =>
result match {
case scala.util.Success(true) =>
case _ =>
CoarseGrainedSchedulerBackend.this.synchronized {
executorsToKill.foreach(pendingRemovalLossReasons -= _)
}
}
result
}(ThreadUtils.sameThread)

killResponse.flatMap(killSuccessful =>
cleanedKillResponse.flatMap(killSuccessful =>
Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
)(ThreadUtils.sameThread)
}
Expand Down
62 changes: 58 additions & 4 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,12 @@ class HeartbeatReceiverSuite
val fakeClusterManager = new FakeClusterManager(rpcEnv, conf)
val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager)
val fakeSchedulerBackend =
new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef, sc.resourceProfileManager)
new FakeSchedulerBackend(
scheduler,
rpcEnv,
fakeClusterManagerRef,
sc.resourceProfileManager,
removeExecutorsOnKillWithReason = Some(ExecutorKilled))
when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)

// Register fake executors with our fake scheduler backend
Expand Down Expand Up @@ -215,6 +220,51 @@ class HeartbeatReceiverSuite
eventually(timeout(5.seconds)) {
assert(!fakeSchedulerBackend.getExecutorIds().contains(executorId1))
assert(!fakeSchedulerBackend.getExecutorIds().contains(executorId2))
verify(scheduler).executorLost(executorId1,
ExecutorProcessLost(s"Executor heartbeat timed out after ${executorTimeout * 2} ms"))
verify(scheduler).executorLost(executorId2,
ExecutorProcessLost(s"Executor heartbeat timed out after ${executorTimeout * 2} ms"))
}
fakeSchedulerBackend.stop()
}

test("heartbeat timeout should not override a concrete backend loss reason") {
val rpcEnv = sc.env.rpcEnv
val fakeClusterManager = new FakeClusterManager(rpcEnv, conf)
val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm-concrete-loss", fakeClusterManager)
val concreteLossReason = ExecutorExited(
1,
exitCausedByApp = false,
"Executor pod was deleted by the cluster manager.")
val fakeSchedulerBackend =
new FakeSchedulerBackend(
scheduler,
rpcEnv,
fakeClusterManagerRef,
sc.resourceProfileManager,
removeExecutorsOnKillWithReason = Some(concreteLossReason))
when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend)

fakeSchedulerBackend.start()
val dummyExecutorEndpoint = new FakeExecutorEndpoint(rpcEnv)
val dummyExecutorEndpointRef =
rpcEnv.setupEndpoint("fake-executor-concrete-loss", dummyExecutorEndpoint)
fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
RegisterExecutor(executorId1, dummyExecutorEndpointRef, "1.2.3.4", 0, Map.empty,
Map.empty, Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
triggerHeartbeat(executorId1, executorShouldReregister = false)

val executorTimeout = heartbeatReceiver.invokePrivate(_executorTimeoutMs())
heartbeatReceiverClock.advance(executorTimeout * 2)
heartbeatReceiverRef.askSync[Boolean](ExpireDeadHosts)
val killThread = heartbeatReceiver.invokePrivate(_killExecutorThread())
killThread.shutdown()
killThread.awaitTermination(10L, TimeUnit.SECONDS)

eventually(timeout(5.seconds)) {
verify(scheduler).executorLost(executorId1, concreteLossReason)
}
fakeSchedulerBackend.stop()
}
Expand Down Expand Up @@ -328,19 +378,23 @@ private class FakeSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
clusterManagerEndpoint: RpcEndpointRef,
resourceProfileManager: ResourceProfileManager)
resourceProfileManager: ResourceProfileManager,
removeExecutorsOnKillWithReason: Option[ExecutorLossReason] = None)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {

def this() = this(null, null, null, null)
def this() = this(null, null, null, null, removeExecutorsOnKillWithReason = None)

protected override def doRequestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](
RequestExecutors(resourceProfileToTotalExecs, numLocalityAwareTasksPerResourceProfileId,
rpHostToLocalTaskCount, Set.empty))
}
}

protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
removeExecutorsOnKillWithReason.foreach { reason =>
executorIds.foreach(removeExecutor(_, reason))
}
clusterManagerEndpoint.ask[Boolean](KillExecutors(executorIds))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.language.postfixOps
import scala.reflect.ClassTag

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.mockito.Mockito.{verify, when}
import org.mockito.invocation.InvocationOnMock
import org.scalatest.concurrent.Eventually
import org.scalatestplus.mockito.MockitoSugar._
Expand Down Expand Up @@ -606,6 +606,36 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
assert(mockEndpointRef.decommissionReceived)
}

test("pending replacement loss reason should be cleared when executor kill is rejected") {
val conf = new SparkConf()
.set(SCHEDULER_REVIVE_INTERVAL.key, "1m")
.set(EXECUTOR_INSTANCES, 0)
.setMaster(
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
.setAppName("test")

sc = new SparkContext(conf)
val backend = sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
val scheduler = backend.getTaskSchedulerImpl()
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
backend.driverEndpoint.askSync[Boolean](
RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map.empty, Map.empty,
Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))

assert(backend.killExecutors(
Seq("1"),
adjustTargetNumExecutors = false,
countFailures = true,
force = true,
lossReason = Some(ExecutorProcessLost("Executor heartbeat timed out"))).isEmpty)

backend.driverEndpoint.send(RemoveExecutor("1", ExecutorKilled))
eventually(timeout(5.seconds)) {
verify(scheduler).executorLost("1", ExecutorKilled)
}
}

private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
sc.submitJob(
rdd,
Expand Down