Skip to content

Commit

Permalink
Fix compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Sep 18, 2019
1 parent dea3952 commit 0c7f990
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
Expand Up @@ -46,6 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, Utils}
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext
with Eventually {

private val executorUpTimeout = 1.minute

test("serialized task larger than max RPC message size") {
val conf = new SparkConf
conf.set(RPC_MESSAGE_MAX_SIZE, 1)
Expand Down
Expand Up @@ -245,7 +245,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd3.setName("Trois")

rdd1.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {1}
val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get
stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD
Expand All @@ -254,7 +254,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd2.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {1}
val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
stageInfo2.rddInfos.size should be {3}
Expand All @@ -263,7 +263,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd3.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
Expand All @@ -279,7 +279,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val rdd2 = rdd1.map(_.toString)
sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1))

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()

listener.stageInfos.size should be {1}
val (stageInfo, _) = listener.stageInfos.head
Expand Down Expand Up @@ -307,7 +307,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val numSlices = 16
val d = sc.parallelize(0 to 10000, numSlices).map(w)
d.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be (1)

val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
Expand All @@ -318,7 +318,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
d4.setName("A Cogroup")
d4.collectAsMap()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be (4)
listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
/**
Expand Down Expand Up @@ -369,7 +369,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
.reduce { case (x, y) => x }
assert(result === 1.to(maxRpcMessageSize).toArray)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
Expand All @@ -385,7 +385,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.isEmpty)
Expand Down Expand Up @@ -440,7 +440,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()

// The exception should be caught, and the event should be propagated to other listeners
assert(jobCounter1.count === 5)
Expand Down Expand Up @@ -510,7 +510,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// after we post one event, both interrupting listeners should get removed, and the
// event log queue should be removed
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
Expand All @@ -519,7 +519,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(counter1.count === 6)
assert(counter2.count === 6)

Expand Down

0 comments on commit 0c7f990

Please sign in to comment.