From 8e2e0f3bc00cb34bf2672ea95faa25f327621559 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 29 Apr 2026 23:05:27 +0000 Subject: [PATCH] [SPARK-56575][CORE] Extract scheduleResubmit() helper to remove identical code blocks Co-authored-by: Isaac --- .../apache/spark/scheduler/DAGScheduler.scala | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f3958bfddec9..22720b98aafd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -325,6 +325,16 @@ private[spark] class DAGScheduler( private val messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") + private def scheduleResubmit(): Unit = { + messageScheduler.schedule( + new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, + DAGScheduler.RESUBMIT_TIMEOUT, + TimeUnit.MILLISECONDS + ) + } + private[spark] var eventProcessLoop = new DAGSchedulerEventProcessLoop(this) // Used for test only. Some tests uses the same thread of the event poster to // process the events to ensure the deterministic behavior during the test. @@ -2179,13 +2189,7 @@ private[spark] class DAGScheduler( if (noResubmitEnqueued) { logInfo(log"Resubmitting ${MDC(FAILED_STAGE, stage)} " + log"(${MDC(FAILED_STAGE_NAME, stage.name)}) due to rollback.") - messageScheduler.schedule( - new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, - DAGScheduler.RESUBMIT_TIMEOUT, - TimeUnit.MILLISECONDS - ) + scheduleResubmit() } } @@ -2510,13 +2514,7 @@ private[spark] class DAGScheduler( log"Resubmitting ${MDC(STAGE, mapStage)} " + log"(${MDC(STAGE_NAME, mapStage.name)}) and ${MDC(FAILED_STAGE, failedStage)} " + log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to fetch failure") - messageScheduler.schedule( - new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, - DAGScheduler.RESUBMIT_TIMEOUT, - TimeUnit.MILLISECONDS - ) + scheduleResubmit() } } @@ -2623,9 +2621,7 @@ private[spark] class DAGScheduler( if (noResubmitEnqueued) { logInfo(log"Resubmitting ${MDC(FAILED_STAGE, failedStage)} " + log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to barrier stage failure.") - messageScheduler.schedule(new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS) + scheduleResubmit() } } }