From f7b87553316c7ef151ff94bc891cd18ddea55b20 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Dec 2016 17:49:23 -0800 Subject: [PATCH] Address --- .../StreamingQueryListenerSuite.scala | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 21b885c3297f2..6be3a2ed45a09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -47,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(spark.streams.active.isEmpty) assert(addedListeners.isEmpty) // Make sure we don't leak any events to the next test + spark.sparkContext.listenerBus.waitUntilEmpty(10000) } testQuietly("single listener, check trigger events are generated correctly") { @@ -195,41 +196,39 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("only one progress event per interval when no data") { // This test will start a query but not push any data, and then check if we push too many events withSQLConf(SQLConf.STREAMING_NO_DATA_EVENT_INTERVAL.key -> "100ms") { - @volatile var progressEventCount = 0 - + @volatile var numProgressEvent = 0 val listener = new StreamingQueryListener { override def onQueryStarted(event: QueryStartedEvent): Unit = {} - override def onQueryProgress(event: QueryProgressEvent): Unit = { - progressEventCount += 1 + numProgressEvent += 1 } - override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} } spark.streams.addListener(listener) try { + val input = new MemoryStream[Int](0, sqlContext) { + @volatile var numTriggers = 0 + override def getOffset: Option[Offset] = { + numTriggers += 1 + super.getOffset + } + } val clock = new StreamManualClock() val actions = mutable.ArrayBuffer[StreamAction]() actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock) + for (_ <- 1 to 100) { + actions += AdvanceManualClock(10) + } actions += AssertOnQuery { _ => - // It should report at least one progress eventually(timeout(streamingTimeout)) { - assert(progressEventCount > 0) + assert(input.numTriggers > 100) // at least 100 triggers have occurred } true } - for (_ <- 1 to 100) { - actions += AdvanceManualClock(10) - actions += AssertOnQuery { _ => - // Sleep so that if the config `noDataEventInterval` doesn't work, it has enough time - // to report too many events. - Thread.sleep(10) - true - } - } - testStream(MemoryStream[Int].toDS)(actions: _*) + testStream(input.toDS)(actions: _*) + spark.sparkContext.listenerBus.waitUntilEmpty(10000) // 11 is the max value of the possible numbers of events. - assert(progressEventCount <= 11) + assert(numProgressEvent >= 1 && numProgressEvent <= 11) } finally { spark.streams.removeListener(listener) }