From 1ac23dea52cb1cd7fe760d83d153dac26c03ec87 Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Tue, 18 Aug 2020 06:47:31 +0000 Subject: [PATCH] [SPARK-32613][CORE] Fix regressions in DecommissionWorkerSuite ### What changes were proposed in this pull request? The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recently closed #29211 necessitates remembering the decommissioning shortly beyond the removal of the executor. In addition to fixing this issue, ensure that DecommissionWorkerSuite continues to pass when executors haven't had a chance to exit eagery. That is the old behavior before #29211 also still works. Added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout. Hardened the test DecommissionWorkerSuite to make it wait for successful job completion. ### Why are the changes needed? First, let me describe the intended behavior of decommissioning: If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job. This fetch failure can happen before the executor is truly marked "lost" because of heartbeat delays. - However, #29211 eagerly exits the executors when they are done decommissioning. This removal of the executor was racing with the fetch failure. By the time the fetch failure is triggered the executor is already removed and thus has forgotten its decommissioning information. (I tested this by delaying the decommissioning). The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout. - In addition the executor loss can also bump up `shuffleFileLostEpoch` (added in #28848). This happens because when the executor is lost, it forgets the shuffle state about just that executor and increments the `shuffleFileLostEpoch`. This incrementing precludes the clearing of state of the entire host when the fetch failure happens because the failed task is still reusing the old epoch. The fix here is also simple: Ignore the `shuffleFileLostEpoch` when the shuffle status is being cleared due to a fetch failure resulting from host decommission. I am strategically making both of these fixes be very local to decommissioning to avoid other regressions. Especially the version stuff is tricky (it hasn't been fundamentally changed since it was first introduced in 2013). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually ran DecommissionWorkerSuite several times using a script and ensured it all passed. ### (Internal) Configs added I added two configs, one of which is sort of meant for testing only: - `spark.test.executor.decommission.initial.sleep.millis`: Initial delay by the decommissioner shutdown thread. Default is same as before of 1 second. This is used for testing only. This one is kept "hidden" (ie not added as a constant to avoid config bloat) - `spark.executor.decommission.removed.infoCacheTTL`: Number of seconds to keep the removed executors decom entries around. It defaults to 5 minutes. It should be around the average time it takes for all of the shuffle data to be fetched from the mapper to the reducer, but I think that can take a while since the reducers also do a multistep sort. Closes #29422 from agrawaldevesh/decom_fixes. Authored-by: Devesh Agrawal Signed-off-by: Wenchen Fan --- .../CoarseGrainedExecutorBackend.scala | 10 ++- .../spark/internal/config/package.scala | 10 +++ .../apache/spark/scheduler/DAGScheduler.scala | 41 ++++++++---- .../spark/scheduler/TaskSchedulerImpl.scala | 29 +++++++-- .../deploy/DecommissionWorkerSuite.scala | 64 +++++++++++++++---- .../scheduler/TaskSchedulerImplSuite.scala | 36 ++++++++--- 6 files changed, 153 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 55fb76b3572a3..07258f270b458 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -294,10 +294,15 @@ private[spark] class CoarseGrainedExecutorBackend( override def run(): Unit = { var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s - + // This config is internal and only used by unit tests to force an executor + // to hang around for longer when decommissioned. + val initialSleepMillis = env.conf.getInt( + "spark.test.executor.decommission.initial.sleep.millis", sleep_time) + if (initialSleepMillis > 0) { + Thread.sleep(initialSleepMillis) + } while (true) { logInfo("Checking to see if we can shutdown.") - Thread.sleep(sleep_time) if (executor == null || executor.numRunningTasks == 0) { if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { logInfo("No running tasks, checking migrations") @@ -323,6 +328,7 @@ private[spark] class CoarseGrainedExecutorBackend( // move forward. lastTaskRunningTime = System.nanoTime() } + Thread.sleep(sleep_time) } } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 200cde0a2d3ed..34acf9f9b30cd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1877,6 +1877,16 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional + private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL = + ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL") + .doc("Duration for which a decommissioned executor's information will be kept after its" + + "removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " + + "decommissioning even after the mapper executor has been decommissioned. This allows " + + "eager recovery from fetch failures caused by decommissioning, increasing job robustness.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("5m") + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7641948ed4b30..ae0387e09cc6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1846,7 +1846,14 @@ private[spark] class DAGScheduler( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + maybeEpoch = Some(task.epoch), + // shuffleFileLostEpoch is ignored when a host is decommissioned because some + // decommissioned executors on that host might have been removed before this fetch + // failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and + // proceed with unconditional removal of shuffle outputs from all executors on that + // host, including from those that we still haven't confirmed as lost due to heartbeat + // delays. + ignoreShuffleFileLostEpoch = isHostDecommissioned) } } @@ -2012,7 +2019,8 @@ private[spark] class DAGScheduler( execId: String, fileLost: Boolean, hostToUnregisterOutputs: Option[String], - maybeEpoch: Option[Long] = None): Unit = { + maybeEpoch: Option[Long] = None, + ignoreShuffleFileLostEpoch: Boolean = false): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) logDebug(s"Considering removal of executor $execId; " + s"fileLost: $fileLost, currentEpoch: $currentEpoch") @@ -2022,16 +2030,25 @@ private[spark] class DAGScheduler( blockManagerMaster.removeExecutor(execId) clearCacheLocs() } - if (fileLost && - (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) { - shuffleFileLostEpoch(execId) = currentEpoch - hostToUnregisterOutputs match { - case Some(host) => - logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") - mapOutputTracker.removeOutputsOnHost(host) - case None => - logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") - mapOutputTracker.removeOutputsOnExecutor(execId) + if (fileLost) { + val remove = if (ignoreShuffleFileLostEpoch) { + true + } else if (!shuffleFileLostEpoch.contains(execId) || + shuffleFileLostEpoch(execId) < currentEpoch) { + shuffleFileLostEpoch(execId) = currentEpoch + true + } else { + false + } + if (remove) { + hostToUnregisterOutputs match { + case Some(host) => + logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnExecutor(execId) + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a0c507e7f893b..2a382380691d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -26,6 +26,9 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet} import scala.util.Random +import com.google.common.base.Ticker +import com.google.common.cache.CacheBuilder + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics @@ -136,7 +139,21 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - private val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] + // We add executors here when we first get decommission notification for them. Executors can + // continue to run even after being asked to decommission, but they will eventually exit. + val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] + + // When they exit and we know of that via heartbeat failure, we will add them to this cache. + // This cache is consulted to know if a fetch failure is because a source executor was + // decommissioned. + lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() + .expireAfterWrite( + conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS) + .ticker(new Ticker{ + override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) + }) + .build[String, ExecutorDecommissionInfo]() + .asMap() def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap @@ -910,7 +927,7 @@ private[spark] class TaskSchedulerImpl( // if we heard isHostDecommissioned ever true, then we keep that one since it is // most likely coming from the cluster manager and thus authoritative val oldDecomInfo = executorsPendingDecommission.get(executorId) - if (oldDecomInfo.isEmpty || !oldDecomInfo.get.isHostDecommissioned) { + if (!oldDecomInfo.exists(_.isHostDecommissioned)) { executorsPendingDecommission(executorId) = decommissionInfo } } @@ -921,7 +938,9 @@ private[spark] class TaskSchedulerImpl( override def getExecutorDecommissionInfo(executorId: String) : Option[ExecutorDecommissionInfo] = synchronized { - executorsPendingDecommission.get(executorId) + executorsPendingDecommission + .get(executorId) + .orElse(Option(decommissionedExecutorsRemoved.get(executorId))) } override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = { @@ -1027,7 +1046,9 @@ private[spark] class TaskSchedulerImpl( } } - executorsPendingDecommission -= executorId + + val decomInfo = executorsPendingDecommission.remove(executorId) + decomInfo.foreach(decommissionedExecutorsRemoved.put(executorId, _)) if (reason != LossReasonPending) { executorIdToHost -= executorId diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index ee9a6be03868f..90b77a21ad02e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -84,6 +84,19 @@ class DecommissionWorkerSuite } } + // Unlike TestUtils.withListener, it also waits for the job to be done + def withListener(sc: SparkContext, listener: RootStageAwareListener) + (body: SparkListener => Unit): Unit = { + sc.addSparkListener(listener) + try { + body(listener) + sc.listenerBus.waitUntilEmpty() + listener.waitForJobDone() + } finally { + sc.listenerBus.removeListener(listener) + } + } + test("decommission workers should not result in job failure") { val maxTaskFailures = 2 val numTimesToKillWorkers = maxTaskFailures + 1 @@ -109,7 +122,7 @@ class DecommissionWorkerSuite } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 1, 1).map { _ => Thread.sleep(5 * 1000L); 1 }.count() @@ -164,7 +177,7 @@ class DecommissionWorkerSuite } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((pid, _) => { val sleepTimeSeconds = if (pid == 0) 1 else 10 Thread.sleep(sleepTimeSeconds * 1000L) @@ -190,10 +203,11 @@ class DecommissionWorkerSuite } } - test("decommission workers ensure that fetch failures lead to rerun") { + def testFetchFailures(initialSleepMillis: Int): Unit = { createWorkers(2) sc = createSparkContext( config.Tests.TEST_NO_STAGE_RETRY.key -> "false", + "spark.test.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString, config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key -> "true") val executorIdToWorkerInfo = getExecutorToWorkerAssignments val executorToDecom = executorIdToWorkerInfo.keysIterator.next @@ -212,22 +226,29 @@ class DecommissionWorkerSuite override def handleRootTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val taskInfo = taskEnd.taskInfo if (taskInfo.executorId == executorToDecom && taskInfo.attemptNumber == 0 && - taskEnd.stageAttemptId == 0) { + taskEnd.stageAttemptId == 0 && taskEnd.stageId == 0) { decommissionWorkerOnMaster(workerToDecom, "decommission worker after task on it is done") } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((_, _) => { val executorId = SparkEnv.get.executorId - val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1 - Thread.sleep(sleepTimeSeconds * 1000L) + val context = TaskContext.get() + // Only sleep in the first attempt to create the required window for decommissioning. + // Subsequent attempts don't need to be delayed to speed up the test. + if (context.attemptNumber() == 0 && context.stageAttemptNumber() == 0) { + val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1 + Thread.sleep(sleepTimeSeconds * 1000L) + } List(1).iterator }, preservesPartitioning = true) .repartition(1).mapPartitions(iter => { val context = TaskContext.get() if (context.attemptNumber == 0 && context.stageAttemptNumber() == 0) { + // Wait a bit for the decommissioning to be triggered in the listener + Thread.sleep(5000) // MapIndex is explicitly -1 to force the entire host to be decommissioned // However, this will cause both the tasks in the preceding stage since the host here is // "localhost" (shortcoming of this single-machine unit test in that all the workers @@ -246,6 +267,14 @@ class DecommissionWorkerSuite assert(tasksSeen.size === 6, s"Expected 6 tasks but got $tasksSeen") } + test("decommission stalled workers ensure that fetch failures lead to rerun") { + testFetchFailures(3600 * 1000) + } + + test("decommission eager workers ensure that fetch failures lead to rerun") { + testFetchFailures(0) + } + private abstract class RootStageAwareListener extends SparkListener { private var rootStageId: Option[Int] = None private val tasksFinished = new ConcurrentLinkedQueue[String]() @@ -265,6 +294,7 @@ class DecommissionWorkerSuite override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { jobEnd.jobResult match { case JobSucceeded => jobDone.set(true) + case JobFailed(exception) => logError(s"Job failed", exception) } } @@ -272,7 +302,15 @@ class DecommissionWorkerSuite protected def handleRootTaskStart(start: SparkListenerTaskStart) = {} + private def getSignature(taskInfo: TaskInfo, stageId: Int, stageAttemptId: Int): + String = { + s"${stageId}:${stageAttemptId}:" + + s"${taskInfo.index}:${taskInfo.attemptNumber}-${taskInfo.status}" + } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + val signature = getSignature(taskStart.taskInfo, taskStart.stageId, taskStart.stageAttemptId) + logInfo(s"Task started: $signature") if (isRootStageId(taskStart.stageId)) { rootTasksStarted.add(taskStart.taskInfo) handleRootTaskStart(taskStart) @@ -280,8 +318,7 @@ class DecommissionWorkerSuite } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - val taskSignature = s"${taskEnd.stageId}:${taskEnd.stageAttemptId}:" + - s"${taskEnd.taskInfo.index}:${taskEnd.taskInfo.attemptNumber}" + val taskSignature = getSignature(taskEnd.taskInfo, taskEnd.stageId, taskEnd.stageAttemptId) logInfo(s"Task End $taskSignature") tasksFinished.add(taskSignature) if (isRootStageId(taskEnd.stageId)) { @@ -291,8 +328,13 @@ class DecommissionWorkerSuite } def getTasksFinished(): Seq[String] = { - assert(jobDone.get(), "Job isn't successfully done yet") - tasksFinished.asScala.toSeq + tasksFinished.asScala.toList + } + + def waitForJobDone(): Unit = { + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(jobDone.get(), "Job isn't successfully done yet") + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e5836458e7f91..66379d86f9bed 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.config import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{Clock, ManualClock, SystemClock} class FakeSchedulerBackend extends SchedulerBackend { def start(): Unit = {} @@ -88,10 +88,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = { + setupSchedulerWithMasterAndClock(master, new SystemClock, confs: _*) + } + + def setupSchedulerWithMasterAndClock(master: String, clock: Clock, confs: (String, String)*): + TaskSchedulerImpl = { val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite") confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) - taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler = new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES), clock = clock) setupHelper() } @@ -1802,9 +1807,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(2 == taskDescriptions.head.resources(GPU).addresses.size) } - private def setupSchedulerForDecommissionTests(): TaskSchedulerImpl = { - val taskScheduler = setupSchedulerWithMaster( + private def setupSchedulerForDecommissionTests(clock: Clock): TaskSchedulerImpl = { + val taskScheduler = setupSchedulerWithMasterAndClock( s"local[2]", + clock, config.CPUS_PER_TASK.key -> 1.toString) taskScheduler.submitTasks(FakeTask.createTaskSet(2)) val multiCoreWorkerOffers = IndexedSeq(WorkerOffer("executor0", "host0", 1), @@ -1815,7 +1821,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } test("scheduler should keep the decommission info where host was decommissioned") { - val scheduler = setupSchedulerForDecommissionTests() + val scheduler = setupSchedulerForDecommissionTests(new SystemClock) scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false)) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true)) @@ -1829,8 +1835,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionInfo("executor2").isEmpty) } - test("scheduler should ignore decommissioning of removed executors") { - val scheduler = setupSchedulerForDecommissionTests() + test("scheduler should eventually purge removed and decommissioned executors") { + val clock = new ManualClock(10000L) + val scheduler = setupSchedulerForDecommissionTests(clock) // executor 0 is decommissioned after loosing assert(scheduler.getExecutorDecommissionInfo("executor0").isEmpty) @@ -1839,14 +1846,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionInfo("executor0").isEmpty) + assert(scheduler.executorsPendingDecommission.isEmpty) + clock.advance(5000) + // executor 1 is decommissioned before loosing assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) + clock.advance(2000) scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) - assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) + assert(scheduler.executorsPendingDecommission.isEmpty) + clock.advance(2000) + // It hasn't been 60 seconds yet before removal + assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) + clock.advance(2000) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) + assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) + clock.advance(301000) assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) + assert(scheduler.decommissionedExecutorsRemoved.isEmpty) } /**