Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24309][CORE] AsyncEventQueue should stop on interrupt. #21356

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,7 +34,11 @@ import org.apache.spark.util.Utils
* Delivery will only begin when the `start()` method is called. The `stop()` method should be
* called when no more events need to be delivered.
*/
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics)
private class AsyncEventQueue(
val name: String,
conf: SparkConf,
metrics: LiveListenerBusMetrics,
bus: LiveListenerBus)
extends SparkListenerBus
with Logging {

Expand Down Expand Up @@ -81,23 +85,18 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
}

private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
try {
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
next = eventQueue.take()
}
eventCount.decrementAndGet()
}

override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
Expand Down Expand Up @@ -130,7 +129,11 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
eventCount.incrementAndGet()
eventQueue.put(POISON_PILL)
}
dispatchThread.join()
// this thread might be trying to stop itself as part of error handling -- we can't join
// in that case.
if (Thread.currentThread() != dispatchThread) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok to leave this, but this doesn't happen anymore, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does still happen, we need this. We see the interrupt in postToAll, which is in the queue thread. If it fails, we call removeListenerOnError. If that results in the queue being empty, we stop the queue.

"spark-listener-group-eventLog" #20 daemon prio=5 os_prio=31 tid=0x00007f831379e800 nid=0x6303 in Object.wait() [0x0000000129226000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000078047ae28> (a org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
        at java.lang.Thread.join(Thread.java:1245)
        - locked <0x000000078047ae28> (a org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
        at java.lang.Thread.join(Thread.java:1319)
        at org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:135)
        at org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:123)
        at org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:121)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.LiveListenerBus.removeListener(LiveListenerBus.scala:121)
        - locked <0x0000000780475fe8> (a org.apache.spark.scheduler.LiveListenerBus)
        at org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:196)
        at org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:37)
        at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:101)

dispatchThread.join()
}
}

def post(event: SparkListenerEvent): Unit = {
Expand Down Expand Up @@ -187,6 +190,12 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
true
}

override def removeListenerOnError(listener: SparkListenerInterface): Unit = {
// the listener failed in an unrecoverably way, we want to remove it from the entire
// LiveListenerBus (potentially stopping a queue if its empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's

bus.removeListener(listener)
}

}

private object AsyncEventQueue {
Expand Down
Expand Up @@ -102,7 +102,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
queue.addListener(listener)

case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics)
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
Expand All @@ -111,6 +111,12 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
}
}

private[scheduler] def removeQueue(queue: String): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove it?

queues.asScala.find(_.name == queue).foreach { q =>
queues.remove(q)
}
}

def removeListener(listener: SparkListenerInterface): Unit = synchronized {
// Remove listener from all queues it was added to, and stop queues that have become empty.
queues.asScala
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Expand Up @@ -60,6 +60,15 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
}
}

/**
* This can be overriden by subclasses if there is any extra cleanup to do when removing a
* listener. In particular AsyncEventQueues can clean up queues in the LiveListenerBus.
*/
def removeListenerOnError(listener: L): Unit = {
removeListener(listener)
}


/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
Expand All @@ -80,6 +89,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
}
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok right now since Spark code never explicitly interrupts these threads. If we ever need to do that, though, this might become a problem... but in that case I don't know how you'd handle this issue without just giving up and stopping everything.

But... is this correct? Your test just calls Thread.interrupt(), which just sets the interrupt flag. But what if the listener calls an operation that would actually turn that into an InterrruptedException? You're not catching that here anywhere.

scala> Thread.currentThread().interrupt()

scala> Thread.sleep(10)
java.lang.InterruptedException: sleep interrupted
  at java.lang.Thread.sleep(Native Method)
  ... 33 elided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah agree, I will handle this too -- but I'll wait to update till there is agreement on the right overall approach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok right now since Spark code never explicitly interrupts these threads. If we ever need to do that, though, this might become a problem... but in that case I don't know how you'd handle this issue without just giving up and stopping everything.

If spark were to explicitly interrupt, then I think we'd also set some other flag indicating a reason, eg. val requestedQueueStop: AtomicBoolean so it shouldn't be hard to distinguish.

I've pushed an update to handle InterruptedException as well.

logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " +
s"Removing that listener.")
removeListenerOnError(listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the thread state is interrupted, is it still safe to keep this queue running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.interrupted() also clears the interrupted state. So that alone isn't a problem -- we're basically declaring that we've handled the interrupt and nobody else gets to know about it anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just throw InterruptedException here to avoid duplicating the error handling code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
} catch {
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
Expand Down
Expand Up @@ -489,6 +489,45 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
}

test("interrupt within listener is handled correctly") {
val conf = new SparkConf(false)
.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
val bus = new LiveListenerBus(conf)
val counter1 = new BasicJobCounter()
val counter2 = new BasicJobCounter()
val interruptingListener1 = new InterruptingListener
val interruptingListener2 = new InterruptingListener
bus.addToSharedQueue(counter1)
bus.addToSharedQueue(interruptingListener1)
bus.addToStatusQueue(counter2)
bus.addToEventLogQueue(interruptingListener2)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 2)

bus.start(mockSparkContext, mockMetricsSystem)

// after we post one event, both interrupting listeners should get removed, and the
// event log queue should be removed
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
assert(counter1.count === 1)
assert(counter2.count === 1)

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter1.count === 6)
assert(counter2.count === 6)

// Make sure stopping works -- this requires putting a poison pill in all active queues, which
// would fail if our interrupted queue was still active, as its queue would be full.
bus.stop()
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
Expand Down Expand Up @@ -547,6 +586,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { throw new Exception }
}

/**
* A simple listener that interrupts on job end.
*/
private class InterruptingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
Thread.currentThread().interrupt()
}
}
}

// These classes can't be declared inside of the SparkListenerSuite class because we don't want
Expand Down