Skip to content

Commit

Permalink
Fix flaky tests in ExternalShuffleServiceSuite and SparkListenerWithC…
Browse files Browse the repository at this point in the history
…lusterSuite
  • Loading branch information
zsxwing committed Jun 1, 2015
1 parent 4b5f12b commit 3b69840
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.spark.ui.jobs

import java.util.concurrent.TimeoutException

import scala.collection.mutable.{HashMap, HashSet, ListBuffer}

import com.google.common.annotations.VisibleForTesting

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -526,4 +530,30 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
startTime = appStarted.time
}

/**
* For testing only. Wait until at least `numExecutors` executors are up, or throw
* `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
*
* @param numExecutors the number of executors to wait at least
* @param timeout time to wait in milliseconds
*/
@VisibleForTesting
private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
val finishTime = System.currentTimeMillis() + timeout
while (System.currentTimeMillis() < finishTime) {
val numBlockManagers = synchronized {
blockManagerIds.size
}
if (numBlockManagers >= numExecutors + 1) {
// Need to count the block manager in driver
return
}
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
// add overhead in the general case.
Thread.sleep(10)
}
throw new TimeoutException(
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])

// In a slow machine, one slave may register hundreds of milliseconds ahead of the other one.
// If we don't wait for all salves, it's possible that only one executor runs all jobs. Then
// all shuffle blocks will be in this executor, ShuffleBlockFetcherIterator will directly fetch
// local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
// In this case, we won't receive FetchFailed. And it will make this test fail.
// Therefore, we should wait until all salves are up
sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)

val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)

rdd.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.broadcast

import scala.concurrent.duration._
import scala.util.Random

import org.scalatest.Assertions
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
import org.apache.spark.io.SnappyCompressionCodec
Expand Down Expand Up @@ -312,13 +310,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
val _sc =
new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", broadcastConf)
// Wait until all salves are up
eventually(timeout(10.seconds), interval(10.milliseconds)) {
_sc.jobProgressListener.synchronized {
val numBlockManagers = _sc.jobProgressListener.blockManagerIds.size
assert(numBlockManagers == numSlaves + 1,
s"Expect ${numSlaves + 1} block managers, but was ${numBlockManagers}")
}
}
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 10000)
_sc
} else {
new SparkContext("local", "test", broadcastConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

import scala.collection.mutable
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.cluster.ExecutorInfo

/**
* Unit tests for SparkListener that require a local cluster.
Expand All @@ -41,6 +41,10 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

// This test will if check the number of executors received by "SparkListener" is same as the
// number of all executors, so we need to wait until all executors are up
sc.jobProgressListener.waitUntilExecutorsUp(2, 10000)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
Expand Down

0 comments on commit 3b69840

Please sign in to comment.