Skip to content

Commit

Permalink
Add test.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Apr 13, 2021
1 parent ae9a8cb commit 3097e73
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Expand Up @@ -304,7 +304,7 @@ private[spark] class TaskSetManager(
list: ArrayBuffer[Int],
speculative: Boolean = false): Option[Int] = {
// Gets preferred task ranking. Otherwise, dequeue from the tail of the list.
val rankedIndexOffsets = schedulingPlungin.map(_.rankTasks(execId, host, tasks, list))
val rankedIndexOffsets = schedulingPlungin.map(_.rankTasks(execId, host, tasks, list.toSeq))
.getOrElse(Range(list.size - 1, -1, -1))

rankedIndexOffsets.foreach { indexOffset =>
Expand Down
Expand Up @@ -2281,6 +2281,31 @@ class TaskSetManagerSuite
createTaskResult(scheduledTask2.taskId.toInt, accumUpdates))
assert(sched.endedTasks(scheduledTask2.index) === Success)
}

test("SPARK-35022: skip excluded executor/node") {
sc = new SparkContext("local", "test")
sc.conf.set(config.TASK_SCHEDULING_PLUGIN_CLASSNAME, classOf[TestSchedulingPlugin].getName)

sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(5)

val clock = new ManualClock
val mockListenerBus = mock(classOf[LiveListenerBus])
val healthTracker = new HealthTracker(mockListenerBus, conf, None, clock)

val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES,
Some(healthTracker), clock = clock)
manager.taskSetExcludelistHelperOpt.foreach { excludeList =>
excludeList.updateExcludedForFailedTask("host1", "exec1", 4, "failed task")
}

// Task index 4 will be skipped as it cannot be scheduled on host1/exec1.
// Next ranked task is index 0.
val err = intercept[IllegalStateException] {
manager.resourceOffer("exec1", "host1", NO_PREF)
}
assert(err.getMessage.contains("scheduled task index is 0"))
}
}

class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) {
Expand Down Expand Up @@ -2310,7 +2335,7 @@ class TestSchedulingPlugin extends TaskSchedulingPlugin {

override def informScheduledTask(message: TaskScheduledResult): Unit = {
if (topRanked != -1 && topRanked != message.scheduledTaskIndex) {
throw new IllegalStateException(s"scheduled task index must be ${message.scheduledTaskIndex}")
throw new IllegalStateException(s"scheduled task index is ${message.scheduledTaskIndex}")
}
}
}

0 comments on commit 3097e73

Please sign in to comment.