-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17780][SQL]Report Throwable to user in StreamExecution #15352
Conversation
cc @marmbrus |
Test build #66343 has finished for PR 15352 at commit
|
@@ -207,7 +207,7 @@ class StreamExecution( | |||
}) | |||
} catch { | |||
case _: InterruptedException if state == TERMINATED => // interrupted by stop() | |||
case NonFatal(e) => | |||
case e if NonFatal(e) || e.isInstanceOf[NoClassDefFoundError] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why only catch non-fatal (+ this one execption) here, given we are always going to record it and kill this thread? Seems it would be better to always attempt to notify the user why we are dying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Now it catches Throwable and rethrows it at the end if it's fatal.
@@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { | |||
/** Signals that a failure is expected and should not kill the test. */ | |||
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { | |||
val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] | |||
override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]" | |||
override def toString(): String = s"ExpectFailure[${causeClass.getName}]" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use getName
as getCanonicalName
returns null for anonymous classes.
@@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { | |||
new UncaughtExceptionHandler { | |||
override def uncaughtException(t: Thread, e: Throwable): Unit = { | |||
streamDeathCause = e | |||
testThread.interrupt() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line because the added test reaches here and interrupt it will make awaitTermination
fails with InterruptException
. It's fine to remove it since the worse case is just waiting for 10 seconds to fail a test.
Test build #66400 has finished for PR 15352 at commit
|
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to | ||
// handle them | ||
if (!NonFatal(e)) { | ||
throw e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamExecution.stop()
won't have been called if the microbatch thread arrives at this location in the code, and the user likely won't call it after seeing the exception. You should probably call the stop
method on each of the sources before exiting the microbatch thread here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@frreiss this is a separate issue. Could you submit a PR after this one gets merged, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch @frreiss, seems we should call sources stop()
in the finally
.
LGTM, I'm going to merge this. |
## What changes were proposed in this pull request? When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying. ## How was this patch tested? `test("NoClassDefFoundError from an incompatible source")` Author: Shixiong Zhu <shixiong@databricks.com> Closes #15352 from zsxwing/SPARK-17780. (cherry picked from commit 9a48e60) Signed-off-by: Michael Armbrust <michael@databricks.com>
## What changes were proposed in this pull request? When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying. ## How was this patch tested? `test("NoClassDefFoundError from an incompatible source")` Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#15352 from zsxwing/SPARK-17780.
What changes were proposed in this pull request?
When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying.
How was this patch tested?
test("NoClassDefFoundError from an incompatible source")