Skip to content

Commit

Permalink
Address
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Dec 2, 2016
1 parent 3425231 commit f7b8755
Showing 1 changed file with 17 additions and 18 deletions.
Expand Up @@ -47,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(spark.streams.active.isEmpty)
assert(addedListeners.isEmpty)
// Make sure we don't leak any events to the next test
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
}

testQuietly("single listener, check trigger events are generated correctly") {
Expand Down Expand Up @@ -195,41 +196,39 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("only one progress event per interval when no data") {
// This test will start a query but not push any data, and then check if we push too many events
withSQLConf(SQLConf.STREAMING_NO_DATA_EVENT_INTERVAL.key -> "100ms") {
@volatile var progressEventCount = 0

@volatile var numProgressEvent = 0
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {}

override def onQueryProgress(event: QueryProgressEvent): Unit = {
progressEventCount += 1
numProgressEvent += 1
}

override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
spark.streams.addListener(listener)
try {
val input = new MemoryStream[Int](0, sqlContext) {
@volatile var numTriggers = 0
override def getOffset: Option[Offset] = {
numTriggers += 1
super.getOffset
}
}
val clock = new StreamManualClock()
val actions = mutable.ArrayBuffer[StreamAction]()
actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock)
for (_ <- 1 to 100) {
actions += AdvanceManualClock(10)
}
actions += AssertOnQuery { _ =>
// It should report at least one progress
eventually(timeout(streamingTimeout)) {
assert(progressEventCount > 0)
assert(input.numTriggers > 100) // at least 100 triggers have occurred
}
true
}
for (_ <- 1 to 100) {
actions += AdvanceManualClock(10)
actions += AssertOnQuery { _ =>
// Sleep so that if the config `noDataEventInterval` doesn't work, it has enough time
// to report too many events.
Thread.sleep(10)
true
}
}
testStream(MemoryStream[Int].toDS)(actions: _*)
testStream(input.toDS)(actions: _*)
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// 11 is the max value of the possible numbers of events.
assert(progressEventCount <= 11)
assert(numProgressEvent >= 1 && numProgressEvent <= 11)
} finally {
spark.streams.removeListener(listener)
}
Expand Down

0 comments on commit f7b8755

Please sign in to comment.