New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24309][CORE] AsyncEventQueue should stop on interrupt. #21356
Conversation
EventListeners can interrupt the event queue thread. In particular, when the EventLoggingListener writes to hdfs, hdfs can interrupt the thread. When there is an interrupt, the queue should be removed and stop accepting any more events. Before this change, the queue would continue to take more events (till it was full), and then would not stop when the application was complete because the PoisonPill couldn't be added. Added a unit test which failed before this change.
@@ -34,7 +34,8 @@ import org.apache.spark.util.Utils | |||
* Delivery will only begin when the `start()` method is called. The `stop()` method should be | |||
* called when no more events need to be delivered. | |||
*/ | |||
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics) | |||
private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one param per line now that they don't fit
|
||
bus.start(mockSparkContext, mockMetricsSystem) | ||
|
||
// after we post one event, the shared queue should get stopped because of the interrupt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/get stopped/stop
@@ -97,6 +98,11 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi | |||
} catch { | |||
case ie: InterruptedException => | |||
logInfo(s"Stopping listener queue $name.", ie) | |||
stopped.set(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is the right thing to do. What would the old bus do here?
I remember there's code to catch exceptions from listeners and just log them, so that other listeners in the same queue are not affected. Not that the event writer queue would suffer from that, but others might.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old bus would stop the entire spark context (details below).
I dunno what the right behavior is either -- I figured this was the intention given the logInfo. Alternatively we could (a) stop the entire spark context, (b) skip this particular event and keep going or (c) stop the one listener which happened to be active on the interrupt, but keep the queue active (if there were more listeners).
more details on the 2.2 behavior:
ListenerBus.postToAll
wouldn't catch the event.
And the polling thread in LiveListenerBus
wraps everything in Utils.tryOrStopSparkContext
.
I did a similar test on branch-2.2: squito@72951bd
18/05/17 21:38:23.446 SparkListenerBus ERROR Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
18/05/17 21:38:23.448 SparkListenerBus ERROR Utils: throw uncaught fatal error in thread SparkListenerBus
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:80)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1282)
at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77)
Test build #90753 has finished for PR 21356 at commit
|
does this mean a problematic listener can kill the queue and crash other listeners in the same queue? Shall we do some isolation? Not introduced by this PR, but I'm wondering why we only catch |
also cc @JoshRosen @zsxwing |
yeah I think marcelo was asking about this above #21356 (comment) of course, we can't stop a listener from causing an OOM or launching 20 threads and hogging the CPU, but we could try for a little more isolation ...
NonFatal is already handled inside |
Test build #90769 has finished for PR 21356 at commit
|
Since |
As an alternative design option, PoisonPill could be handled differently, since some msgs should have higher priority and can be considered them as part of your "control plane". |
I pushed an update which only removes the listener which was active at the interrupt. Note that is not the same thing as the listener which caused the interrupt, necessarily -- we have no idea who caused the interrupt really, but I guess it would be rather contrived for another Thread to interrupt the event queue threads. I'm not sure if I should leave some more guards against interrupts as well -- eg. check the interrupted status before posting the event to a specific listener, and if its set throwing InterruptedException, and leaving the generic handling of InterruptedException in the AsyncEventQueue, to shut the queue down entirely if there is any interrupt outside of a posting to a listener. |
I'm pretty sure the existing use of PoisonPill is intentionally put at the end of the queue, so that the everything is processed before the queue is shut down. |
Yes I guessed that, I was just thinking that knowing that its to time shut down is one thing and consuming all input is another, anyway. |
@squito Could you clarify this? This sounds like a bug in HDFS. I'm curious when this will happen. Although we don't interrupt the AsyncEventQueue thread in Spark right now, I think it's still better to respect InterruptedException rather than ignoring it, e.g., we may decide to interrupt the thread in future. |
So, to clarify a little more what the previous code did: If we just wanted to emulate the old behavior we could do that, although I agree it's better to try not to do that. It's weird that HDFS would be throwing that exception in the caller thread, but at the same time, it's an issue that we have to work around, since even if they fix HDFS, there are just so many versions out there for that to not matter much. |
@vanzin I just want to understand the real issue here to make sure the current solution is correct. |
@@ -187,6 +190,12 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi | |||
true | |||
} | |||
|
|||
override def removeListenerOnError(listener: SparkListenerInterface): Unit = { | |||
// the listener failed in an unrecoverably way, we want to remove it from the entire | |||
// LiveListenerBus (potentially stopping a queue if its empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's
@@ -111,6 +111,12 @@ private[spark] class LiveListenerBus(conf: SparkConf) { | |||
} | |||
} | |||
|
|||
private[scheduler] def removeQueue(queue: String): Unit = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove it?
@@ -80,6 +89,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { | |||
} | |||
try { | |||
doPostEvent(listener, event) | |||
if (Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ok right now since Spark code never explicitly interrupts these threads. If we ever need to do that, though, this might become a problem... but in that case I don't know how you'd handle this issue without just giving up and stopping everything.
But... is this correct? Your test just calls Thread.interrupt()
, which just sets the interrupt flag. But what if the listener calls an operation that would actually turn that into an InterrruptedException
? You're not catching that here anywhere.
scala> Thread.currentThread().interrupt()
scala> Thread.sleep(10)
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
... 33 elided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah agree, I will handle this too -- but I'll wait to update till there is agreement on the right overall approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ok right now since Spark code never explicitly interrupts these threads. If we ever need to do that, though, this might become a problem... but in that case I don't know how you'd handle this issue without just giving up and stopping everything.
If spark were to explicitly interrupt, then I think we'd also set some other flag indicating a reason, eg. val requestedQueueStop: AtomicBoolean
so it shouldn't be hard to distinguish.
I've pushed an update to handle InterruptedException
as well.
@zsxwing Imran knows more since he debugged it, but I see a lot of code like this in Hadoop libraries:
Which could cause things like this. I don't know what would cause the exception in the first place, but the fact that the library is propagating the interrupt to the caller explains why Spark is seeing it in one of its own calls. |
The code here is just propagating the interrupt state and is not doing anything wrong. @squito do you know what would cause this in the first place? |
Test build #90795 has finished for PR 21356 at commit
|
the problem was not actually an interrupted exception from the listener, it was that the Thread's state was getting set to interrupted, and then there would be a failure later in
so really the question is what to do if a listener sets the interrupt state or throws an InterruptedException. But from all the discussion its not that clear to me what we should (other than of course preventing the queue from hanging and the applciation from stopping). |
I think the behavior in your change (trying to keep things running) is fine. Since the queues are isolated now, there's no risk of bad listeners affecting important ones (like the ones used for dynamic allocation or building the UI) and causing cascading issues, which was a risk in the previous versions. |
Test build #90873 has finished for PR 21356 at commit
|
if (Thread.interrupted()) { | ||
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " + | ||
s"Removing that listener.") | ||
removeListenerOnError(listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the thread state is interrupted, is it still safe to keep this queue running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread.interrupted()
also clears the interrupted state. So that alone isn't a problem -- we're basically declaring that we've handled the interrupt and nobody else gets to know about it anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could just throw InterruptedException
here to avoid duplicating the error handling code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good aside from the unused method.
if (Thread.interrupted()) { | ||
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " + | ||
s"Removing that listener.") | ||
removeListenerOnError(listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could just throw InterruptedException
here to avoid duplicating the error handling code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good pending tests.
dispatchThread.join() | ||
// this thread might be trying to stop itself as part of error handling -- we can't join | ||
// in that case. | ||
if (Thread.currentThread() != dispatchThread) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's ok to leave this, but this doesn't happen anymore, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does still happen, we need this. We see the interrupt in postToAll, which is in the queue thread. If it fails, we call removeListenerOnError
. If that results in the queue being empty, we stop the queue.
"spark-listener-group-eventLog" #20 daemon prio=5 os_prio=31 tid=0x00007f831379e800 nid=0x6303 in Object.wait() [0x0000000129226000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000078047ae28> (a org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
at java.lang.Thread.join(Thread.java:1245)
- locked <0x000000078047ae28> (a org.apache.spark.scheduler.AsyncEventQueue$$anon$1)
at java.lang.Thread.join(Thread.java:1319)
at org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:135)
at org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:123)
at org.apache.spark.scheduler.LiveListenerBus$$anonfun$removeListener$2.apply(LiveListenerBus.scala:121)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.LiveListenerBus.removeListener(LiveListenerBus.scala:121)
- locked <0x0000000780475fe8> (a org.apache.spark.scheduler.LiveListenerBus)
at org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:196)
at org.apache.spark.scheduler.AsyncEventQueue.removeListenerOnError(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:101)
Test build #90913 has finished for PR 21356 at commit
|
Test build #90914 has finished for PR 21356 at commit
|
Merging to master / 2.3. |
EventListeners can interrupt the event queue thread. In particular, when the EventLoggingListener writes to hdfs, hdfs can interrupt the thread. When there is an interrupt, the queue should be removed and stop accepting any more events. Before this change, the queue would continue to take more events (till it was full), and then would not stop when the application was complete because the PoisonPill couldn't be added. Added a unit test which failed before this change. Author: Imran Rashid <irashid@cloudera.com> Closes #21356 from squito/SPARK-24309. (cherry picked from commit 3244707) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
EventListeners can interrupt the event queue thread. In particular, when the EventLoggingListener writes to hdfs, hdfs can interrupt the thread. When there is an interrupt, the queue should be removed and stop accepting any more events. Before this change, the queue would continue to take more events (till it was full), and then would not stop when the application was complete because the PoisonPill couldn't be added. Added a unit test which failed before this change. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21356 from squito/SPARK-24309. (cherry-picked from commit 3244707) Ref: LIHADOOP-40197 RB=1414281 BUG=LIHADOOP-40197 G=superfriends-reviewers R=fli,mshen,yezhou,edlu A=yezhou
EventListeners can interrupt the event queue thread. In particular,
when the EventLoggingListener writes to hdfs, hdfs can interrupt the
thread. When there is an interrupt, the queue should be removed and stop
accepting any more events. Before this change, the queue would continue
to take more events (till it was full), and then would not stop when the
application was complete because the PoisonPill couldn't be added.
Added a unit test which failed before this change.