New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18908][SS] Creating StreamingQueryException should check if logicalPlan is created #16322
Conversation
@@ -560,6 +589,9 @@ class StreamExecution( | |||
@volatile private var noNewData = false | |||
|
|||
override def processAllAvailable(): Unit = { | |||
if (streamDeathCause != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just make processAllAvailable
fail fast.
Test build #70288 has finished for PR 16322 at commit
|
Test build #70342 has started for PR 16322 at commit |
@@ -240,7 +241,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { | |||
val resetConfValues = mutable.Map[String, Option[String]]() | |||
|
|||
@volatile | |||
var streamDeathCause: Throwable = null | |||
var streamThreadDeathCause: Throwable = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed it to avoid confusing with StreamExecution.streamDeathCause
} | ||
} | ||
_logicalPlan | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting pretty complicated... Do we really need to include all of this information in StreamingQueryException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of this information
Do you mean logicalPlan
? It's used to generate the plan for each batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I'm trying to say is, (if I understand correctly) all of this extra logic is to make sure that there aren't cycles when we fail to initialize and then try to use that uninitialized information in order to generate the exception.
I'm asking why that information needs to be in the exception at all? Or if we should just try to initialize earlier in the stream thread and throw a different exception when initialization fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it follows AnalysisException
to include as many as information as possible. @tdas do you want to clarify it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, if we separate initialization failure from runtime failures though, then we can only include the extra information when it makes sense (i.e. when we have actually started running and have offsets, etc to report).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @marmbrus. We can include logicalPlan
in the toDebugString
only if the state allows, e.g. microBatchThreadStatus = INITIALIZED
, and then we don't need any of this extra code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or really what i was suggesting was to have a separate try/catch early in the stream thread that forces initialization of anything that is lazy.
Test build #70372 has finished for PR 16322 at commit
|
Test build #70376 has finished for PR 16322 at commit
|
Test build #70377 has finished for PR 16322 at commit
|
Test build #70375 has finished for PR 16322 at commit
|
Test build #70381 has finished for PR 16322 at commit
|
@@ -282,7 +289,7 @@ class StreamExecution( | |||
updateStatusMessage("Stopped") | |||
case e: Throwable => | |||
streamDeathCause = new StreamingQueryException( | |||
this, | |||
toDebugString(includeLogicalPlan = logicalPlanInitilized), | |||
s"Query $prettyIdString terminated with exception: ${e.getMessage}", | |||
e, | |||
committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the one possibly confusing thing is that we are still wrapping initialization errors with this class that has a lot of field that don't make sense until the stream is initialized (i.e. committed and available offsets). I guess you did it this way because the function is Option[StreamingQueryException]
? We might consider changing that, but others should chime in if they feel strongly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. StreamingQueryException
is a public API. I think it's better to not break it considering 2.1.0 just passed.
@marmbrus updated as per our offline discussion. I also added a new method |
Test build #70425 has finished for PR 16322 at commit
|
@marmbrus should take a look because he had opinions on this |
LGTM |
} | ||
}) | ||
currentStream.awaitInitialization(streamingTimeout.toMillis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document why this is needed?
@@ -559,7 +580,36 @@ class StreamExecution( | |||
/** A flag to indicate that a batch has completed with no new data available. */ | |||
@volatile private var noNewData = false | |||
|
|||
/** | |||
* Assert that the await APIs should not be called in the stream thread. Otherwise, it may cause | |||
* dead-lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document what kind of deadlock can happen.
*/ | ||
private def assertAwaitThread(): Unit = { | ||
if (microBatchThread eq Thread.currentThread) { | ||
throw new IllegalStateException("Cannot wait inside a stream thread") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cannot wait for a query state from the same thread that is running the query
@@ -228,7 +228,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { | |||
*/ | |||
def testStream( | |||
_stream: Dataset[_], | |||
outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = { | |||
outputMode: OutputMode = OutputMode.Append, | |||
ignoreThreadDeathCause: Boolean = false)(actions: StreamAction*): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ignore
field is really hard to understand as a user of this function. And adding a field to just for one test does not make sense. Rather make an action ExpectStreamThreadException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending tests
Test build #70502 has finished for PR 16322 at commit
|
Thanks! Merging to master and 2.1. |
…gicalPlan is created ## What changes were proposed in this pull request? This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created. In addition, this PR also fixes the following issues in `StreamingQueryException`: - `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message. - Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace. ## How was this patch tested? The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16322 from zsxwing/SPARK-18907. (cherry picked from commit ff7d82a) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
…gicalPlan is created ## What changes were proposed in this pull request? This PR audits places using `logicalPlan` in StreamExecution and ensures they all handles the case that `logicalPlan` cannot be created. In addition, this PR also fixes the following issues in `StreamingQueryException`: - `StreamingQueryException` and `StreamExecution` are cycle-dependent because in the `StreamingQueryException`'s constructor, it calls `StreamExecution`'s `toDebugString` which uses `StreamingQueryException`. Hence it will output `null` value in the error message. - Duplicated stack trace when calling Throwable.printStackTrace because StreamingQueryException's toString contains the stack trace. ## How was this patch tested? The updated `test("max files per trigger - incorrect values")`. I found this issue when I switched from `testStream` to the real codes to verify the failure in this test. Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#16322 from zsxwing/SPARK-18907.
What changes were proposed in this pull request?
This PR audits places using
logicalPlan
in StreamExecution and ensures they all handles the case thatlogicalPlan
cannot be created.In addition, this PR also fixes the following issues in
StreamingQueryException
:StreamingQueryException
andStreamExecution
are cycle-dependent because in theStreamingQueryException
's constructor, it callsStreamExecution
'stoDebugString
which usesStreamingQueryException
. Hence it will outputnull
value in the error message.How was this patch tested?
The updated
test("max files per trigger - incorrect values")
. I found this issue when I switched fromtestStream
to the real codes to verify the failure in this test.