Skip to content

Commit

Permalink
[SPARK-26046][SS] Add StreamingQueryManager.listListeners()
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add a listListeners() method to StreamingQueryManager that lists all StreamingQueryListeners that have been added to that manager.

### Why are the changes needed?

While it's best practice to keep handles on all listeners added, it's still nice to have an API to be able to list what listeners have been added to a StreamingQueryManager.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Modified existing unit tests to use the new API instead of using reflection.

Closes #25518 from mukulmurthy/26046-listener.

Authored-by: Mukul Murthy <mukul.murthy@gmail.com>
Signed-off-by: Jose Torres <torres.joseph.f+github@gmail.com>
  • Loading branch information
mukulmurthy authored and jose-torres committed Sep 5, 2019
1 parent 151b954 commit 3929d16
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
Expand Up @@ -21,6 +21,7 @@ import java.util.UUID
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -199,6 +200,15 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
listenerBus.removeListener(listener)
}

/**
* List all [[StreamingQueryListener]]s attached to this [[StreamingQueryManager]].
*
* @since 3.0.0
*/
def listListeners(): Array[StreamingQueryListener] = {
listenerBus.listeners.asScala.toArray
}

/** Post a listener event */
private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = {
listenerBus.post(event)
Expand Down
Expand Up @@ -47,7 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
after {
spark.streams.active.foreach(_.stop())
assert(spark.streams.active.isEmpty)
assert(addedListeners().isEmpty)
assert(spark.streams.listListeners().isEmpty)
// Make sure we don't leak any events to the next test
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
}
Expand Down Expand Up @@ -223,7 +223,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(isListenerActive(listener1) === false)
assert(isListenerActive(listener2))
} finally {
addedListeners().foreach(spark.streams.removeListener)
spark.streams.listListeners().foreach(spark.streams.removeListener)
}
}

Expand Down Expand Up @@ -362,10 +362,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(session1.streams.ne(session2.streams))

withListenerAdded(collector1, session1) {
assert(addedListeners(session1).nonEmpty)
assert(session1.streams.listListeners().nonEmpty)

withListenerAdded(collector2, session2) {
assert(addedListeners(session2).nonEmpty)
assert(session2.streams.listListeners().nonEmpty)

// query on session1 should send events only to collector1
runQuery(session1)
Expand Down Expand Up @@ -440,13 +440,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

private def addedListeners(session: SparkSession = spark): Array[StreamingQueryListener] = {
val listenerBusMethod =
PrivateMethod[StreamingQueryListenerBus]('listenerBus)
val listenerBus = session.streams invokePrivate listenerBusMethod()
listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
}

/** Collects events from the StreamingQueryListener for testing */
class EventCollector extends StreamingQueryListener {
// to catch errors in the async listener events
Expand Down

0 comments on commit 3929d16

Please sign in to comment.