Skip to content

Commit

Permalink
@cloudfan's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dagrawal3409 committed Aug 17, 2020
1 parent 343bf8b commit 9a4cce6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private[spark] class CoarseGrainedExecutorBackend(
// This config is internal and only used by unit tests to force an executor
// to hang around for longer when decommissioned.
val initialSleepMillis = env.conf.getInt(
"spark.executor.decommission.initial.sleep.millis", sleep_time)
"spark.test.executor.decommission.initial.sleep.millis", sleep_time)
if (initialSleepMillis > 0) {
Thread.sleep(initialSleepMillis)
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1877,6 +1877,16 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createOptional

private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL =
ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL")
.doc("Duration for which a decommissioned executor's information will be kept after its" +
"removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " +
"decommissioning even after the mapper executor has been decommissioned. This allows " +
"eager recovery from fetch failures caused by decommissioning, increasing job robustness.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5m")

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private[spark] class TaskSchedulerImpl(
// decommissioned.
lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder()
.expireAfterWrite(
conf.getLong("spark.decommissioningRememberAfterRemoval.seconds", 60L), TimeUnit.SECONDS)
conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS)
.ticker(new Ticker{
override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis())
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class DecommissionWorkerSuite
createWorkers(2)
sc = createSparkContext(
config.Tests.TEST_NO_STAGE_RETRY.key -> "false",
"spark.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString,
"spark.test.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString,
config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key -> "true")
val executorIdToWorkerInfo = getExecutorToWorkerAssignments
val executorToDecom = executorIdToWorkerInfo.keysIterator.next
Expand Down

0 comments on commit 9a4cce6

Please sign in to comment.