-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41533][CONNECT] Proper Error Handling for Spark Connect Server / Client #39212
Conversation
I'm working on implementing centralized PySpark error messages from #39137, and it will include whole errors generated by PySpark packages including Spark Connect. |
python/pyspark/sql/connect/client.py
Outdated
class SparkConnectClientException(Exception): | ||
def __init__(self, message: str) -> None: | ||
super(SparkConnectClientException, self).__init__(message) | ||
|
||
|
||
class SparkConnectAnalysisException(SparkConnectClientException): | ||
def __init__(self, reason: str, message: str, plan: str) -> None: | ||
self._reason = reason | ||
self._message = message | ||
self._plan = plan | ||
|
||
def __str__(self) -> str: | ||
return f"{self._message}\nPlan: {self._plan}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: just created ticket for migrating Spark Connect errors into error classes in the future: SPARK-41712
I'm absolutely looking for suggestions on designing the error structure. If it doesn't make sense please let me know. |
...connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests for SparkConnectAnalysisException
with detailed error message?
cc @itholic who has more experience on error message.
...connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
Show resolved
Hide resolved
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) -> "pandas.DataFrame": | |||
if m is not None: | |||
df.attrs["metrics"] = self._build_metrics(m) | |||
return df | |||
|
|||
def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to match the same logic in
spark/python/pyspark/sql/utils.py
Lines 158 to 197 in 764edaf
def convert_exception(e: Py4JJavaError) -> CapturedException: | |
assert e is not None | |
assert SparkContext._jvm is not None | |
assert SparkContext._gateway is not None | |
jvm = SparkContext._jvm | |
gw = SparkContext._gateway | |
if is_instance_of(gw, e, "org.apache.spark.sql.catalyst.parser.ParseException"): | |
return ParseException(origin=e) | |
# Order matters. ParseException inherits AnalysisException. | |
elif is_instance_of(gw, e, "org.apache.spark.sql.AnalysisException"): | |
return AnalysisException(origin=e) | |
elif is_instance_of(gw, e, "org.apache.spark.sql.streaming.StreamingQueryException"): | |
return StreamingQueryException(origin=e) | |
elif is_instance_of(gw, e, "org.apache.spark.sql.execution.QueryExecutionException"): | |
return QueryExecutionException(origin=e) | |
elif is_instance_of(gw, e, "java.lang.IllegalArgumentException"): | |
return IllegalArgumentException(origin=e) | |
elif is_instance_of(gw, e, "org.apache.spark.SparkUpgradeException"): | |
return SparkUpgradeException(origin=e) | |
c: Py4JJavaError = e.getCause() | |
stacktrace: str = jvm.org.apache.spark.util.Utils.exceptionString(e) | |
if c is not None and ( | |
is_instance_of(gw, c, "org.apache.spark.api.python.PythonException") | |
# To make sure this only catches Python UDFs. | |
and any( | |
map( | |
lambda v: "org.apache.spark.sql.execution.python" in v.toString(), c.getStackTrace() | |
) | |
) | |
): | |
msg = ( | |
"\n An exception was thrown from the Python worker. " | |
"Please see the stack trace below.\n%s" % c.getMessage() | |
) | |
return PythonException(msg, stacktrace) | |
return UnknownException(desc=e.toString(), stackTrace=stacktrace, cause=c) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see what I can do without replicating a gigantic list of branches on the server and client side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simplified the logic because it's going to be messy to replicate all of the exception types exactly from the SQL side. Right now the printed output looks like this for example:
SparkConnectException: (org.apache.spark.SparkNumberFormatException) [CAST_INVALID_INPUT] The value 'id' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
== SQL(line 1, position 8) ==
select cast('id' as double) from range(10)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good otherwise. It's nice to have a way for a better error.
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushing new version now.
...connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
Show resolved
Hide resolved
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) -> "pandas.DataFrame": | |||
if m is not None: | |||
df.attrs["metrics"] = self._build_metrics(m) | |||
return df | |||
|
|||
def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see what I can do without replicating a gigantic list of branches on the server and client side.
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) -> "pandas.DataFrame": | |||
if m is not None: | |||
df.attrs["metrics"] = self._build_metrics(m) | |||
return df | |||
|
|||
def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simplified the logic because it's going to be messy to replicate all of the exception types exactly from the SQL side. Right now the printed output looks like this for example:
SparkConnectException: (org.apache.spark.SparkNumberFormatException) [CAST_INVALID_INPUT] The value 'id' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.
== SQL(line 1, position 8) ==
select cast('id' as double) from range(10)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last nit, maybe we should add the new deps into https://github.com/apache/spark/blob/master/python/docs/source/getting_started/install.rst#dependencies.
Otherwise LGTM
Updated the documentation. |
Merged to master. |
@@ -303,6 +376,7 @@ def __init__(self, connectionString: str, userId: Optional[str] = None): | |||
|
|||
self._channel = self._builder.toChannel() | |||
self._stub = grpc_lib.SparkConnectServiceStub(self._channel) | |||
# Configure logging for the SparkConnect client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grundprinzip @HyukjinKwon this comment is kind of dangling here... should something be called here?
What changes were proposed in this pull request?
This PR improves the error handling on the Spark Connect server and client side. First, this patch moves the error handling logic on the server into a common error handler partial function that differentiates between the internal Spark errors and other runtime errors.
For custom Spark exceptions, the actual internal error is wrapped into a Google RPC Status and sent as trailing metadata to the client.
On the client side, similarly, the error handling is moved into a common function. All GRPC errors are wrapped into custom exceptions to avoid presenting the user with confusing GRPC errors. If available the attached RPC status is extracted and added to the exception.
Lastly, this patch adds basic logging functionality that can be enabled using the environment variable
SPARK_CONNECT_LOG_LEVEL
and can be set toinfo
,warn
,error
, anddebug
.Why are the changes needed?
Usability
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT