diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index 577c4103d3827..95c7a858944ff 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -95,6 +95,10 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { removeListenerOnError(listener) } } catch { + case ie: InterruptedException => + logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " + + s"Removing that listener.", ie) + removeListenerOnError(listener) case NonFatal(e) if !isIgnorableException(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } finally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 9723e7d9379eb..6ffd1e84f7adb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -489,43 +489,46 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(bus.findListenersByClass[BasicJobCounter]().isEmpty) } - test("interrupt within listener is handled correctly") { - val conf = new SparkConf(false) - .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) - val bus = new LiveListenerBus(conf) - val counter1 = new BasicJobCounter() - val counter2 = new BasicJobCounter() - val interruptingListener1 = new InterruptingListener - val interruptingListener2 = new InterruptingListener - bus.addToSharedQueue(counter1) - bus.addToSharedQueue(interruptingListener1) - bus.addToStatusQueue(counter2) - bus.addToEventLogQueue(interruptingListener2) - assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) - assert(bus.findListenersByClass[BasicJobCounter]().size === 2) - assert(bus.findListenersByClass[InterruptingListener]().size === 2) - - bus.start(mockSparkContext, mockMetricsSystem) - - // after we post one event, both interrupting listeners should get removed, and the - // event log queue should be removed - bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) - assert(bus.findListenersByClass[BasicJobCounter]().size === 2) - assert(bus.findListenersByClass[InterruptingListener]().size === 0) - assert(counter1.count === 1) - assert(counter2.count === 1) + Seq(true, false).foreach { throwInterruptedException => + val suffix = if (throwInterruptedException) "throw interrupt" else "set Thread interrupted" + test(s"interrupt within listener is handled correctly: $suffix") { + val conf = new SparkConf(false) + .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + val bus = new LiveListenerBus(conf) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val interruptingListener1 = new InterruptingListener(throwInterruptedException) + val interruptingListener2 = new InterruptingListener(throwInterruptedException) + bus.addToSharedQueue(counter1) + bus.addToSharedQueue(interruptingListener1) + bus.addToStatusQueue(counter2) + bus.addToEventLogQueue(interruptingListener2) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[InterruptingListener]().size === 2) - // posting more events should be fine, they'll just get processed from the OK queue. - (0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - assert(counter1.count === 6) - assert(counter2.count === 6) + bus.start(mockSparkContext, mockMetricsSystem) - // Make sure stopping works -- this requires putting a poison pill in all active queues, which - // would fail if our interrupted queue was still active, as its queue would be full. - bus.stop() + // after we post one event, both interrupting listeners should get removed, and the + // event log queue should be removed + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + assert(bus.findListenersByClass[InterruptingListener]().size === 0) + assert(counter1.count === 1) + assert(counter2.count === 1) + + // posting more events should be fine, they'll just get processed from the OK queue. + (0 until 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } + bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(counter1.count === 6) + assert(counter2.count === 6) + + // Make sure stopping works -- this requires putting a poison pill in all active queues, which + // would fail if our interrupted queue was still active, as its queue would be full. + bus.stop() + } } /** @@ -589,9 +592,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match /** * A simple listener that interrupts on job end. */ - private class InterruptingListener extends SparkListener { + private class InterruptingListener(val throwInterruptedException: Boolean) extends SparkListener { override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { - Thread.currentThread().interrupt() + if (throwInterruptedException) { + throw new InterruptedException("got interrupted") + } else { + Thread.currentThread().interrupt() + } } } }