diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c295dd4f0aed8..b3ddc810b3e47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -304,7 +304,7 @@ private[spark] class TaskSetManager( list: ArrayBuffer[Int], speculative: Boolean = false): Option[Int] = { // Gets preferred task ranking. Otherwise, dequeue from the tail of the list. - val rankedIndexOffsets = schedulingPlungin.map(_.rankTasks(execId, host, tasks, list)) + val rankedIndexOffsets = schedulingPlungin.map(_.rankTasks(execId, host, tasks, list.toSeq)) .getOrElse(Range(list.size - 1, -1, -1)) rankedIndexOffsets.foreach { indexOffset => diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 624b5145537d0..5fe0f5d974007 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2281,6 +2281,31 @@ class TaskSetManagerSuite createTaskResult(scheduledTask2.taskId.toInt, accumUpdates)) assert(sched.endedTasks(scheduledTask2.index) === Success) } + + test("SPARK-35022: skip excluded executor/node") { + sc = new SparkContext("local", "test") + sc.conf.set(config.TASK_SCHEDULING_PLUGIN_CLASSNAME, classOf[TestSchedulingPlugin].getName) + + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + val taskSet = FakeTask.createTaskSet(5) + + val clock = new ManualClock + val mockListenerBus = mock(classOf[LiveListenerBus]) + val healthTracker = new HealthTracker(mockListenerBus, conf, None, clock) + + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, + Some(healthTracker), clock = clock) + manager.taskSetExcludelistHelperOpt.foreach { excludeList => + excludeList.updateExcludedForFailedTask("host1", "exec1", 4, "failed task") + } + + // Task index 4 will be skipped as it cannot be scheduled on host1/exec1. + // Next ranked task is index 0. + val err = intercept[IllegalStateException] { + manager.resourceOffer("exec1", "host1", NO_PREF) + } + assert(err.getMessage.contains("scheduled task index is 0")) + } } class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) { @@ -2310,7 +2335,7 @@ class TestSchedulingPlugin extends TaskSchedulingPlugin { override def informScheduledTask(message: TaskScheduledResult): Unit = { if (topRanked != -1 && topRanked != message.scheduledTaskIndex) { - throw new IllegalStateException(s"scheduled task index must be ${message.scheduledTaskIndex}") + throw new IllegalStateException(s"scheduled task index is ${message.scheduledTaskIndex}") } } }