Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26268][CORE] Do not resubmit tasks when executors are lost #24462

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -216,11 +216,13 @@ private[spark] class ExecutorAllocationManager(
if (cachedExecutorIdleTimeoutS < 0) {
throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!")
}
// Require external shuffle service for dynamic allocation
// Require external shuffle for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
!conf.get(config.EXTERNAL_SHUFFLE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires external " +
"shuffle. You may enable this through spark.shuffle.service.enabled or" +
"spark.shuffle.external.enabled.")
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
Expand Down
Expand Up @@ -366,6 +366,12 @@ package object config {
.booleanConf
.createWithDefault(true)

private[spark] val EXTERNAL_SHUFFLE_ENABLED =
ConfigBuilder("spark.shuffle.external.enabled")
.doc("Whether the shuffle manager is running externally to the Spark deployment.")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

Expand Down
Expand Up @@ -1791,7 +1791,8 @@ private[spark] class DAGScheduler(
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
val fileLost = !sc.conf.get(config.EXTERNAL_SHUFFLE_ENABLED) &&
(workerLost || !env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
Expand Down
Expand Up @@ -998,6 +998,7 @@ private[spark] class TaskSetManager(
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
&& !conf.get(config.EXTERNAL_SHUFFLE_ENABLED)
&& !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
Expand Down