Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngone51 committed Aug 31, 2020
1 parent 3152e9b commit a0bc4f6
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

private val shuffleFileLossTests = Seq(
("executor process lost with shuffle service", ExecutorProcessLost("", None), true, false),
("worker lost with shuffle service", ExecutorProcessLost("", None), true, true),
("worker lost without shuffle service", ExecutorProcessLost("", None), false, true),
("worker lost with shuffle service", ExecutorProcessLost("", Some("hostA")), true, true),
("worker lost without shuffle service", ExecutorProcessLost("", Some("hostA")), false, true),
("executor failure with shuffle service", ExecutorKilled, true, false),
("executor failure without shuffle service", ExecutorKilled, false, true))

Expand All @@ -874,10 +874,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
completeShuffleMapStageSuccessfully(0, 0, 1)
val expectHostFileLoss = event match {
case ExecutorProcessLost(_, hostOpt, _) => hostOpt.isDefined
case _ => false
}
runEvent(ExecutorLost("hostA-exec", event))
verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec")
if (expectFileLoss) {
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
if (expectHostFileLoss) {
verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA")
} else {
verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec")
}
intercept[MetadataFetchFailedException] {
mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1864,18 +1864,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
test("scheduler should keep the decommission state where host was decommissioned") {
val clock = new ManualClock(10000L)
val scheduler = setupSchedulerForDecommissionTests(clock, 2)
val oldTime = clock.getTimeMillis()
val decomTime = clock.getTimeMillis()
scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", None))
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", Some("host1")))

clock.advance(3000L)
scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0 new", None))
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1 new", None))

assert(scheduler.getExecutorDecommissionState("executor0")
=== Some(ExecutorDecommissionState(oldTime, None)))
=== Some(ExecutorDecommissionState(decomTime, None)))
assert(scheduler.getExecutorDecommissionState("executor1")
=== Some(ExecutorDecommissionState(oldTime, Some("host1"))))
=== Some(ExecutorDecommissionState(decomTime, Some("host1"))))
assert(scheduler.getExecutorDecommissionState("executor2").isEmpty)
}

Expand Down Expand Up @@ -1914,17 +1910,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(manager.copiesRunning.take(2) === Array(0, 0))
clock.advance(2000)

// Decommission state should hang around a bit after removal ...
assert(scheduler.getExecutorDecommissionState("executor1").isDefined)
scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None))
clock.advance(2000)
assert(scheduler.getExecutorDecommissionState("executor1").isDefined)

// The default timeout for expiry is 300k milliseconds (5 minutes) which completes now,
// and the executor1's decommission state should finally be purged.
clock.advance(300000)
assert(scheduler.getExecutorDecommissionState("executor1").isEmpty)

// Now give it some resources and both tasks should be rerun
val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq(
WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten
Expand Down

0 comments on commit a0bc4f6

Please sign in to comment.