New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24294] Throw SparkException when OOM in BroadcastExchangeExec #21342
Conversation
Test build #90675 has finished for PR 21342 at commit
|
Test build #90676 has finished for PR 21342 at commit
|
cc @sameeragarwal @hvanhovell @cloud-fan @jiangxb1987 |
The change looks good. |
LGTM, Thanks! I've seen variations of this bug in the past as well and am not aware of a better way to fix this issue. scala> Await.result(Future(throw new RuntimeException("foo")), 50.seconds)
java.lang.RuntimeException: foo // regular exceptions fail fast scala> Await.result(Future(throw new OutOfMemoryError("foo")), 50.seconds)
java.util.concurrent.TimeoutException: Futures timed out after [50 seconds] // OOME times out |
Could we hold this PR? This change sounds risky. |
I'm also in favor of delaying for a couple of days for more detailed review because historically I think these types of changes have been high risk. The risk calculus might be a bit different if this was fixing a critical "wrong result" correctness bug, but it seems like this is a longstanding annoyance which causes either poor performance or visible job failures, so I don't see an extreme urgency to get this in immediately. Therefore let's take a bit of time to ponder things and sleep on it to be sure that we've thought through corner-cases (to be clear, I do think this patch is generally in a good direction). Some specifics:
|
Thanks a lot for looking into this. The issue is that, sometimes user would configure Yes, there is implicit behavior changes in this pr. In current change we throw a If we really want to throw a
|
Basically, I agree with this two-step direction. However, I think that it is too risky to simply throw and catch Can we throw and catch another Exception or filter the caught |
How about we create a special Exception class to wrap OOM and InterupttedException, and say it's only used to work around the scala bug? |
Test build #90846 has finished for PR 21342 at commit
|
@kiszk @cloud-fan |
I will update the pr title&description if the change is on the right direction. |
Test build #90847 has finished for PR 21342 at commit
|
retest this please |
Test build #90853 has finished for PR 21342 at commit
|
@Private | ||
public final class SparkOutOfMemoryException extends Exception { | ||
|
||
private OutOfMemoryError oe; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so OutOfMemoryError subclasses java.lang.Error
where as this is subclassing from Exeception, does this matter here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@felixcheung thanks for review.
In current change there is no SparkOutOfMemoryException. I wrap fatal Throwable in SparkFatalException
@@ -111,12 +112,18 @@ case class BroadcastExchangeExec( | |||
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) | |||
broadcasted | |||
} catch { | |||
// SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw | |||
// SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult | |||
// will catch this exception and re-throw the wrapped fatal throwable. | |||
case oe: OutOfMemoryError => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR, but I'm a little worried about catching OOM here. Spark has SparkOutOfMemoryError
, and it seems more reasonable to catch SparkOutOfMemoryError
. This can be fixed in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed; let's fix that separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, @cloud-fan, do you mean that, ideally during relationFuture
, all the OOM error thrown of type SparkOutOfMemoryError ? (SparkOutOfMemoryError subclass of OutOfMemoryError)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, I think Spark will only throw SparkOutOfMemoryError
and we should only extend the error message for SparkOutOfMemoryError
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'm glad take this and resolve in another pr.
LGTM, which scala version has fixed this bug? |
* fatal throwable in {@link scala.concurrent.Future}'s body, and re-throw | ||
* SparkFatalException, which wraps the fatal throwable inside. | ||
*/ | ||
private[spark] final class SparkFatalException(val throwable: Throwable) extends Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it will not. The SparkFatalException
has a short life cycle: it is created inside Future {}
and then caught and stripped by ThreadUtils.awaitResult
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there places where we fetch results from Futures without going through the ThreadUtils.awaitResult
? In other words, is that a narrow waist? Would it make sense to add a second redundant layer of unwrapping at the top of SparkUncaughtExceptionHandler
to handle that case? Not sure yet, but just thinking aloud here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OTOH I guess we're actually only using this in one place right now, so I think things are correct as written, but I was just kind of abstractly worrying about potential future pitfalls in case people start using this pattern in new code without also noticing the ThreadUtils.awayResult
requirement.
case oe: OutOfMemoryError => | ||
throw new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + | ||
throw new SparkFatalException( | ||
new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: Can we perform object operations (allocate OutOfMemoryError
, allocate and concatenate String
s) when we caught OutOfMemoryError
?
I think that we have space since we failed to allocate a large object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we're likely to have reclaimable space at this point, so the chance of a second OOM / failure here seems small. I'm pretty sure that the OutOfMemoryError being caught here often originates from Spark itself where we explicitly throw another OutOfMemoryError
at a lower layer of the system, in which case we still actually have heap to allocate strings. We should investigate and clean up that practice, but let's do that in a separate PR.
Thanks for the updates. The net change / scope of changes have been significantly reduced here, so I feel that this change is a lot less risky now. I left only one nitpicky comment at https://github.com/apache/spark/pull/21342/files#r189753488 worrying about potential future risks from people coming along and writing new code throwing spark/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala Line 42 in 3244707
SparkFatalException on top of an OOM and treat that as an OOM. Debatable if we want to do that, but it's a great way of addressing @gatorsmile's comment at #21342 (comment) and avoids future breakage.
Otherwise, LGTM. |
https://issues.scala-lang.org/browse/SI-9554?orig=1 is still "OPEN", not sure which scala version can fix this bug. |
/** | ||
* SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we catch | ||
* fatal throwable in {@link scala.concurrent.Future}'s body, and re-throw | ||
* SparkFatalException, which wraps the fatal throwable inside. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also mention that, we must use ThreadUtil.awaitResult
to run the Future, catch this exception and re-throw the original exception.
Thanks again for comments ! |
Test build #91039 has finished for PR 21342 at commit
|
Updated changes LGTM. Thanks for working on this! |
Thanks! Merged to master. |
Thanks for merging ! |
Scala 2.13.0-M5 and forward has improved handling of InterruptedExceptions and fatal errors should be propagated, I'd welcome feedback on the new logic when possible. Thank you! |
When OutOfMemoryError thrown from BroadcastExchangeExec, scala.concurrent.Future will hit scala bug – scala/bug#9554, and hang until future timeout: We could wrap the OOM inside SparkException to resolve this issue. Manually tested. Author: jinxing <jinxing6042@126.com> Closes apache#21342 from jinxing64/SPARK-24294. (cherry-picked from commit b7a036b) Ref: LIHADOOP-40177 RB=1414263 BUG=LIHADOOP-40177 R=fli,mshen,yezhou A=yezhou
What changes were proposed in this pull request?
When OutOfMemoryError thrown from BroadcastExchangeExec, scala.concurrent.Future will hit scala bug – scala/bug#9554, and hang until future timeout:
We could wrap the OOM inside SparkException to resolve this issue.
How was this patch tested?
Manually tested.