From 9a4cce67c8b811d2ba1fbc35b4d8c2ff77a4cd15 Mon Sep 17 00:00:00 2001 From: Devesh Agrawal Date: Mon, 17 Aug 2020 14:55:55 -0700 Subject: [PATCH] @cloudfan's comments --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 10 ++++++++++ .../org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../apache/spark/deploy/DecommissionWorkerSuite.scala | 2 +- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 23de8afc61b3e..07258f270b458 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -297,7 +297,7 @@ private[spark] class CoarseGrainedExecutorBackend( // This config is internal and only used by unit tests to force an executor // to hang around for longer when decommissioned. val initialSleepMillis = env.conf.getInt( - "spark.executor.decommission.initial.sleep.millis", sleep_time) + "spark.test.executor.decommission.initial.sleep.millis", sleep_time) if (initialSleepMillis > 0) { Thread.sleep(initialSleepMillis) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 200cde0a2d3ed..34acf9f9b30cd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1877,6 +1877,16 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional + private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL = + ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL") + .doc("Duration for which a decommissioned executor's information will be kept after its" + + "removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " + + "decommissioning even after the mapper executor has been decommissioned. This allows " + + "eager recovery from fetch failures caused by decommissioning, increasing job robustness.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("5m") + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6501a75508db8..2a382380691d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -148,7 +148,7 @@ private[spark] class TaskSchedulerImpl( // decommissioned. lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() .expireAfterWrite( - conf.getLong("spark.decommissioningRememberAfterRemoval.seconds", 60L), TimeUnit.SECONDS) + conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS) .ticker(new Ticker{ override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) }) diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index f2ed7736bb4dd..90b77a21ad02e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -207,7 +207,7 @@ class DecommissionWorkerSuite createWorkers(2) sc = createSparkContext( config.Tests.TEST_NO_STAGE_RETRY.key -> "false", - "spark.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString, + "spark.test.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString, config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key -> "true") val executorIdToWorkerInfo = getExecutorToWorkerAssignments val executorToDecom = executorIdToWorkerInfo.keysIterator.next