New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43206] [SS] [CONNECT] StreamingQuery exception() include stack trace #40966
Conversation
@rangadi @pengzhon-db @amaliujia Can you guys take a look? TY! |
@allisonwang-db Could you also take a look? Thanks! |
val maxSize = SparkEnv.get.conf.get(CONNECT_JVM_STACK_TRACE_MAX_SIZE) | ||
val stackTrace = Option(ExceptionUtils.getStackTrace(e)) | ||
stackTrace.foreach { s => | ||
exception_builder.setStackTrace(StringUtils.abbreviate(s, maxSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message and stack trace are truncated due to the HTTP header size limitation (default 8k). I noticed that you're not throwing this as a StatusRuntimeException. If the same limitation doesn't apply here, you could use untruncated messages and stack traces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That'd be great! This exception() method was meant to provide more information including the logical plan. If it can be done without truncation that's even better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 it seems that you are using proto itself to carry the exception information thus the limitation does not apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
cc @grundprinzip for proto change.
private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = { | ||
unregisterTerminatedStream(terminatedQuery) | ||
// unregisterTerminatedStream(terminatedQuery) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WweiL is this by mistake? Should we revert this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh shoot we should definitely revert this
…eamingQueryManager.scala` ### What changes were proposed in this pull request? #40966 introduced a unneeded change in `StreamingQueryManager` by error. This fix removes it. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #41006 from WweiL/minor-sqm-uncomment. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…race ### What changes were proposed in this pull request? Add stack trace to streamingQuery's `exception()` method. Following apache@a5c8a3c#diff-98baf452f0352c75a39f39351c5f9e656675810b6d4cfd178f1b0bae9751495b Add to both python client and scala client ### Why are the changes needed? Including stack trace is helpful in debugging ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test: Python: ``` Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.5.0.dev0 /_/ Using Python version 3.9.16 (main, Dec 7 2022 01:11:58) Client connected to the Spark Connect server at localhost SparkSession available as 'spark'. >>> from pyspark.sql.functions import col, udf >>> from pyspark.errors import StreamingQueryException >>> sdf = spark.readStream.format("text").load("python/test_support/sql/streaming") >>> bad_udf = udf(lambda x: 1 / 0) >>> sq = sdf.select(bad_udf(col("value"))).writeStream.format("memory").queryName("this_query").start() >>> sq.exception() StreamingQueryException('Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (ip-10-110-19-234.us-west-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File "<stdin>", line 1, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)\n\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:94)\n\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:75)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)\n\tat org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:438)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1554)\n\tat org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:483)\n\tat org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:422)\n\tat org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:488)\n\tat org.apache.spark.sql.execution.datasources.v2.V...\n\nJVM stacktrace:\norg.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (ip-10-110-19-234.us-west-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n File "<stdin>", line 1, in <lambda>\nZeroDivisionError: division by zero\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)\n\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:94)\n\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:75)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)\n\tat org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:438)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1554)\n\tat org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:483)\n\tat org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:422)\n\tat org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:488)\n\tat org.apache.spark.sql.execution.datasources.v2.V...') ``` 2. Scala: ``` Spark session available as 'spark'. _____ __ ______ __ / ___/____ ____ ______/ /__ / ____/___ ____ ____ ___ _____/ /_ \__ \/ __ \/ __ `/ ___/ //_/ / / / __ \/ __ \/ __ \/ _ \/ ___/ __/ ___/ / /_/ / /_/ / / / ,< / /___/ /_/ / / / / / / / __/ /__/ /_ /____/ .___/\__,_/_/ /_/|_| \____/\____/_/ /_/_/ /_/\___/\___/\__/ /_/ import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ val sdf = spark.readStream.format("text").load("python/test_support/sql/streaming") sdf: org.apache.spark.sql.package.DataFrame = [value: string] val badUdf = udf((x: String) => 1 / 0) badUdf: org.apache.spark.sql.expressions.UserDefinedFunction = ScalarUserDefinedFunction( ammonite.$sess.cmd2$Helper$$Lambda$1913/745186412239d9cb7, ArrayBuffer(StringEncoder), PrimitiveIntEncoder, None, true, true ) val sq = sdf.select(badUdf(col("value"))).writeStream.format("memory").queryName("this_query").start() sq: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.streaming.RemoteStreamingQuery13866865 sq.isActive res4: Boolean = false sq.exception.get.toString res5: String = """org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (ip-10-110-19-234.us-west-2.compute.internal executor driver): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2411) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.jav... JVM stacktrace: org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (ip-10-110-19-234.us-west-2.compute.internal executor driver): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431) ... ``` Closes apache#40966 from WweiL/SPARK-43206-exception-stk-trace-new-2. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
…eamingQueryManager.scala` ### What changes were proposed in this pull request? apache#40966 introduced a unneeded change in `StreamingQueryManager` by error. This fix removes it. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes apache#41006 from WweiL/minor-sqm-uncomment. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Add stack trace to streamingQuery's
exception()
method. Following a5c8a3c#diff-98baf452f0352c75a39f39351c5f9e656675810b6d4cfd178f1b0bae9751495bAdd to both python client and scala client
Why are the changes needed?
Including stack trace is helpful in debugging
Does this PR introduce any user-facing change?
No
How was this patch tested?
Manual test:
Python: