Skip to content

Commit

Permalink
handle InterruptedException as well
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed May 21, 2018
1 parent 4a1f657 commit 09d55af
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 37 deletions.
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Expand Up @@ -95,6 +95,10 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
removeListenerOnError(listener)
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " +
s"Removing that listener.", ie)
removeListenerOnError(listener)
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
Expand Down
Expand Up @@ -489,43 +489,46 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(bus.findListenersByClass[BasicJobCounter]().isEmpty)
}

test("interrupt within listener is handled correctly") {
val conf = new SparkConf(false)
.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
val bus = new LiveListenerBus(conf)
val counter1 = new BasicJobCounter()
val counter2 = new BasicJobCounter()
val interruptingListener1 = new InterruptingListener
val interruptingListener2 = new InterruptingListener
bus.addToSharedQueue(counter1)
bus.addToSharedQueue(interruptingListener1)
bus.addToStatusQueue(counter2)
bus.addToEventLogQueue(interruptingListener2)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 2)

bus.start(mockSparkContext, mockMetricsSystem)

// after we post one event, both interrupting listeners should get removed, and the
// event log queue should be removed
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
assert(counter1.count === 1)
assert(counter2.count === 1)
Seq(true, false).foreach { throwInterruptedException =>
val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted"
test(s"interrupt within listener is handled correctly: $suffix") {
val conf = new SparkConf(false)
.set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5)
val bus = new LiveListenerBus(conf)
val counter1 = new BasicJobCounter()
val counter2 = new BasicJobCounter()
val interruptingListener1 = new InterruptingListener(throwInterruptedException)
val interruptingListener2 = new InterruptingListener(throwInterruptedException)
bus.addToSharedQueue(counter1)
bus.addToSharedQueue(interruptingListener1)
bus.addToStatusQueue(counter2)
bus.addToEventLogQueue(interruptingListener2)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 2)

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter1.count === 6)
assert(counter2.count === 6)
bus.start(mockSparkContext, mockMetricsSystem)

// Make sure stopping works -- this requires putting a poison pill in all active queues, which
// would fail if our interrupted queue was still active, as its queue would be full.
bus.stop()
// after we post one event, both interrupting listeners should get removed, and the
// event log queue should be removed
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE))
assert(bus.findListenersByClass[BasicJobCounter]().size === 2)
assert(bus.findListenersByClass[InterruptingListener]().size === 0)
assert(counter1.count === 1)
assert(counter2.count === 1)

// posting more events should be fine, they'll just get processed from the OK queue.
(0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter1.count === 6)
assert(counter2.count === 6)

// Make sure stopping works -- this requires putting a poison pill in all active queues, which
// would fail if our interrupted queue was still active, as its queue would be full.
bus.stop()
}
}

/**
Expand Down Expand Up @@ -589,9 +592,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
/**
* A simple listener that interrupts on job end.
*/
private class InterruptingListener extends SparkListener {
private class InterruptingListener(val throwInterruptedException: Boolean) extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
Thread.currentThread().interrupt()
if (throwInterruptedException) {
throw new InterruptedException("got interrupted")
} else {
Thread.currentThread().interrupt()
}
}
}
}
Expand Down

0 comments on commit 09d55af

Please sign in to comment.