Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29161][CORE][SQL][STREAMING] Unify default wait time for waitUntilEmpty #25837

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Expand Up @@ -264,7 +264,7 @@ private[spark] object TestUtils {
try {
body(listener)
} finally {
sc.listenerBus.waitUntilEmpty(TimeUnit.SECONDS.toMillis(10))
sc.listenerBus.waitUntilEmpty()
sc.listenerBus.removeListener(listener)
}
}
Expand Down
Expand Up @@ -186,6 +186,17 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
metricsSystem.registerSource(metrics)
}

/**
* For testing only. Wait until there are no more events in the queue, or until the default
* wait time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
* emptied.
* Exposed for testing.
*/
@throws(classOf[TimeoutException])
private[spark] def waitUntilEmpty(): Unit = {
waitUntilEmpty(TimeUnit.SECONDS.toMillis(10))
}
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved

/**
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Throw `TimeoutException` if the specified time elapsed before the queue
Expand Down
Expand Up @@ -126,7 +126,7 @@ private[spark] object AccumulatorSuite {
sc.addSparkListener(listener)
testBody
// wait until all events have been processed before proceeding to assert things
sc.listenerBus.waitUntilEmpty(10 * 1000)
sc.listenerBus.waitUntilEmpty()
val accums = listener.getCompletedStageInfos.flatMap(_.accumulables.values)
val isSet = accums.exists { a =>
a.name == Some(PEAK_EXECUTION_MEMORY) && a.value.exists(_.asInstanceOf[Long] > 0L)
Expand Down
Expand Up @@ -64,7 +64,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {

private def post(event: SparkListenerEvent): Unit = {
listenerBus.post(event)
listenerBus.waitUntilEmpty(1000)
listenerBus.waitUntilEmpty()
}

test("initialize dynamic allocation in SparkContext") {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Expand Up @@ -498,7 +498,7 @@ object ShuffleSuite {

job

sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
AggregatedShuffleMetrics(recordsWritten, recordsRead, bytesWritten, bytesRead)
}
}
Expand Up @@ -63,7 +63,7 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(sc.getRDDStorageInfo.length === 0)
rdd.collect()
sc.listenerBus.waitUntilEmpty(10000)
sc.listenerBus.waitUntilEmpty()
eventually(timeout(10.seconds), interval(100.milliseconds)) {
assert(sc.getRDDStorageInfo.length === 1)
}
Expand Down
Expand Up @@ -716,7 +716,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
x
}.collect()
sc.listenerBus.waitUntilEmpty(10000)
sc.listenerBus.waitUntilEmpty()
// As executors will send the metrics of running tasks via heartbeat, we can use this to check
// whether there is any running task.
eventually(timeout(10.seconds)) {
Expand Down
Expand Up @@ -29,9 +29,6 @@ import org.apache.spark.util.SparkConfWithEnv

class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {

/** Length of time to wait while draining listener events. */
private val WAIT_TIMEOUT_MILLIS = 10000

test("verify that correct log urls get propagated from workers") {
sc = new SparkContext("local-cluster[2,1,1024]", "test")

Expand All @@ -41,7 +38,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
Expand All @@ -61,7 +58,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
val listener = listeners(0)
Expand Down
Expand Up @@ -182,7 +182,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
.reduceByKey(_ + _)
.saveAsTextFile(tmpFile.toURI.toString)

sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
assert(inputRead == numRecords)

assert(outputWritten == numBuckets)
Expand Down Expand Up @@ -243,7 +243,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
val taskMetrics = new ArrayBuffer[Long]()

// Avoid receiving earlier taskEnd events
sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()

sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
Expand All @@ -253,7 +253,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext

job

sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
taskMetrics.sum
}

Expand Down Expand Up @@ -293,7 +293,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext

try {
rdd.saveAsTextFile(outPath.toString)
sc.listenerBus.waitUntilEmpty(500)
sc.listenerBus.waitUntilEmpty()
assert(taskBytesWritten.length == 2)
val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
Expand Down
Expand Up @@ -172,9 +172,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
override def applicationAttemptId(): Option[String] = None
}

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

/**
* Listeners which records some information to verify in UTs. Getter-kind methods in this class
* ensures the value is returned after ensuring there's no event to process, as well as the
Expand Down Expand Up @@ -230,7 +227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
_endedTasks.toSet
}

private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty()
}

var sparkListener: EventInfoRecordingListener = null
Expand Down Expand Up @@ -839,7 +836,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val testRdd = new MyRDD(sc, 0, Nil)
val waiter = scheduler.submitJob(testRdd, func, Seq.empty, CallSite.empty,
resultHandler, properties)
sc.listenerBus.waitUntilEmpty(1000L)
sc.listenerBus.waitUntilEmpty()
assert(assertionError.get() === null)
}

Expand Down Expand Up @@ -957,7 +954,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
completeNextResultStageWithSuccess(1, 1)

// Confirm job finished successfully
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(ended)
assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
assertDataStructuresEmpty()
Expand Down Expand Up @@ -994,7 +991,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
} else {
// Stage should have been aborted and removed from running stages
assertDataStructuresEmpty()
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(ended)
jobResult match {
case JobFailed(reason) =>
Expand Down Expand Up @@ -1116,7 +1113,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
completeNextResultStageWithSuccess(2, 1)

assertDataStructuresEmpty()
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(ended)
assert(results === Map(0 -> 42))
}
Expand Down Expand Up @@ -1175,7 +1172,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Complete the result stage.
completeNextResultStageWithSuccess(1, 1)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
assertDataStructuresEmpty()
}

Expand Down Expand Up @@ -1204,7 +1201,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Complete the result stage.
completeNextResultStageWithSuccess(1, 0)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
assertDataStructuresEmpty()
}

Expand All @@ -1230,7 +1227,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
null))

// Assert the stage has been cancelled.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
assert(failure.getMessage.startsWith("Job aborted due to stage failure: Could not recover " +
"from a failed barrier ResultStage."))
}
Expand Down Expand Up @@ -2668,7 +2665,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
sc.parallelize(1 to tasks, tasks).foreach { _ =>
accum.add(1L)
}
sc.listenerBus.waitUntilEmpty(1000)
sc.listenerBus.waitUntilEmpty()
assert(foundCount.get() === tasks)
}
}
Expand Down
Expand Up @@ -38,9 +38,6 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

import LiveListenerBus._

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

val jobCompletionTime = 1421191296660L

private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext])
Expand All @@ -65,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

sc.listenerBus.addToSharedQueue(listener)
sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
sc.stop()

assert(listener.sparkExSeen)
Expand Down Expand Up @@ -97,7 +94,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Starting listener bus should flush all buffered events
bus.start(mockSparkContext, mockMetricsSystem)
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(counter.count === 5)
assert(sharedQueueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 5)
Expand Down Expand Up @@ -223,7 +220,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd2.setName("Target RDD")
rdd2.count()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()

listener.stageInfos.size should be {1}
val (stageInfo, taskInfoMetrics) = listener.stageInfos.head
Expand All @@ -248,7 +245,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
rdd3.setName("Trois")

rdd1.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {1}
val stageInfo1 = listener.stageInfos.keys.find(_.stageId == 0).get
stageInfo1.rddInfos.size should be {1} // ParallelCollectionRDD
Expand All @@ -257,7 +254,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd2.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {1}
val stageInfo2 = listener.stageInfos.keys.find(_.stageId == 1).get
stageInfo2.rddInfos.size should be {3}
Expand All @@ -266,7 +263,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.stageInfos.clear()

rdd3.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD
Expand All @@ -282,7 +279,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val rdd2 = rdd1.map(_.toString)
sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1))

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()

listener.stageInfos.size should be {1}
val (stageInfo, _) = listener.stageInfos.head
Expand Down Expand Up @@ -310,7 +307,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val numSlices = 16
val d = sc.parallelize(0 to 10000, numSlices).map(w)
d.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be (1)

val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
Expand All @@ -321,7 +318,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
d4.setName("A Cogroup")
d4.collectAsMap()

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
listener.stageInfos.size should be (4)
listener.stageInfos.foreach { case (stageInfo, taskInfoMetrics) =>
/**
Expand Down Expand Up @@ -372,7 +369,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
.reduce { case (x, y) => x }
assert(result === 1.to(maxRpcMessageSize).toArray)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
Expand All @@ -388,7 +385,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)

sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
sc.listenerBus.waitUntilEmpty()
val TASK_INDEX = 0
assert(listener.startedTasks.contains(TASK_INDEX))
assert(listener.startedGettingResultTasks.isEmpty)
Expand Down Expand Up @@ -443,7 +440,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()

// The exception should be caught, and the event should be propagated to other listeners
assert(jobCounter1.count === 5)
Expand Down Expand Up @@ -513,7 +510,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// after we post one event, both interrupting listeners should get removed, and the
// event log queue should be removed
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
Expand All @@ -522,7 +519,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
bus.waitUntilEmpty()
assert(counter1.count === 6)
assert(counter2.count === 6)

Expand Down
Expand Up @@ -849,7 +849,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSparkSessi
sparkContext.addSparkListener(jobListener)
try {
val result = f
sparkContext.listenerBus.waitUntilEmpty(10000L)
sparkContext.listenerBus.waitUntilEmpty()
assert(numJobTrigered === 0)
result
} finally {
Expand Down
Expand Up @@ -2114,7 +2114,7 @@ class DataFrameSuite extends QueryTest with SharedSparkSession {

val df = spark.read.json(path.getCanonicalPath)
assert(df.columns === Array("i", "p"))
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(numJobs == 1)
}
}
Expand Down
Expand Up @@ -488,7 +488,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
sparkContext.addSparkListener(bytesReadListener)
try {
spark.read.csv(path).limit(1).collect()
sparkContext.listenerBus.waitUntilEmpty(1000L)
sparkContext.listenerBus.waitUntilEmpty()
assert(bytesReads.sum === 7860)
} finally {
sparkContext.removeSparkListener(bytesReadListener)
Expand Down