-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48459][CONNECT][PYTHON] Implement DataFrameQueryContext in Spark Connect #46789
Conversation
cc @zhengruifeng @hvanhovell @itholic @cloud-fan @MaxGekk I need your look here 🙏 |
} | ||
} | ||
|
||
message PythonOrigin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don;t name stuff Python/Scala if it is not language specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually intend this to be language specific. For example, Scala side could have stacktrace chain
spark/sql/api/src/main/scala/org/apache/spark/sql/catalyst/trees/QueryContexts.scala
Line 138 in 8bbbde7
stackTrace: Seq[StackTraceElement], |
This is Python specifically a string for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, I don't mind combining it for now if you believe it won't be language-specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't the Python code be modeled as a StackTraceElement? Whats the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StackTraceElement
has JDK dedicated method and fields (e.g., classLoaderName
). I think we should have a dedicated one for individual languages.
While DataFrameQueryContext
/ SQLQueryContext
have common information (for now), I think we will end up with having some language-specific and dedicated information for both APIs in the future.
However, I am open to having common one. There is a way to have the common (and, e.g., throw an exception if that information doesn't make sense in some languages).
4c0d755
to
a5fcb8d
Compare
a5fcb8d
to
0c586b3
Compare
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
Outdated
Show resolved
Hide resolved
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
Outdated
Show resolved
Hide resolved
0c586b3
to
646ba23
Compare
646ba23
to
62ac773
Compare
@@ -342,6 +344,11 @@ message Expression { | |||
} | |||
} | |||
|
|||
message ExpressionCommon { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we over-engineering? what else can be put here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's same as Relation.RelationCommon
so it's more for consistency (so we can reuse Origin
as well for call site). I think it's fine.
@@ -41,7 +41,7 @@ def test_dataframe_query_context(self): | |||
error_class="DIVIDE_BY_ZERO", | |||
message_parameters={"config": '"spark.sql.ansi.enabled"'}, | |||
query_context_type=QueryContextType.DataFrame, | |||
pyspark_fragment="divide", | |||
fragment="__truediv__", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, is it a regression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, this is actually a bug fix. Previously it was (wrongly) checking the Scala side DataFrameQueryContext.fragment
which had to be DataFrameQueryContext.pysparkFragement
. Now it correctly checks DataFrameQueryContext.pysparkFragement
.
This is because we now merge pysparkFragment
to fragment
at DataFrameQueryContext
, and it correctly tests the fragment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the scala side, we use the call site that is closest to the user code as the fragment. I don't think __truediv__
is user-friendly.
For Java, we can find the latest stack trace that is from org.apache.spark
, whose next trace is the user code. Can we do the same thing in Python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we should do. Let me take a look separately after this PR. This PR isn't related to the original implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, actually it points out correctly. /
is a shortcut of __truediv__
, and it points out the line number properly.
The missing part is to hide pyspark.*
. I will take a separate look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @itholic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is because the names are different in Python and Scala.
Python
spark/python/pyspark/sql/column.py
Line 115 in f0b7cfa
def __truediv__( |
Scala
spark/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
Line 655 in f0b7cfa
def divide(other: Any): Column = this / other |
Can we do the same thing in Python?
I think one way to do this is to add one dict
into with_origin
that can get the name of the Scala function corresponding to the Python function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we can leverage pyspark.util.try_simplify_traceback
but let's do this in a separate PR.
.build()) | ||
.setSummary(queryCtx.summary()) | ||
.build() | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really an unconditional else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, yes because we only have QueryContextType.SQL
and QueryContextType.DataFrame
.
.newBuilder() | ||
val context = if (queryCtx.contextType() == QueryContextType.SQL) { | ||
builder | ||
.setContextType(FetchErrorDetailsResponse.QueryContext.ContextType.SQL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we never set this before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah .. so it has been always SQLQueryContext
by default ...
.setObjectType(queryCtx.objectType()) | ||
.setObjectName(queryCtx.objectName()) | ||
.setStartIndex(queryCtx.startIndex()) | ||
.setStopIndex(queryCtx.stopIndex()) | ||
.setFragment(queryCtx.fragment()) | ||
.build()) | ||
.setSummary(queryCtx.summary()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so we did not have QueryContext.sumary()
API before this change.
@@ -379,17 +386,12 @@ class UnknownException(CapturedException, BaseUnknownException): | |||
""" | |||
|
|||
|
|||
class QueryContext(BaseQueryContext): | |||
class SQLQueryContext(BaseQueryContext): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we consider this a private / developer API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only parent class QueryContext
is an API (at pyspark.errors.QueryContext
) for now. This is at least consistent with Scala side.
return str(self._q.summary()) | ||
|
||
|
||
class DataFrameQueryContext(BaseQueryContext): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the type annotation wrong here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is for the classic side.
origin = current_origin() | ||
fragment = origin.fragment | ||
call_site = origin.call_site |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is what I don't like about this approach at least for Spark Connect. There is no need to use the thread-local for this propagation.
The outside wrapper manipulates the global state for the constructor to pick up here. Shouldn't we simply generate the origin here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite like this too actually. Yes, we can pass the explicit callsite around. It happened to be like this to match the implementation across Catalyst Optimizer, Scala, and PySpark w/ Spark Classic.
Let me revisit this to fix up Spark Classic side together and see if we can do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm generally ok with the PR. My only concern is that we're using a thread local to propagate the Python query context for spark connect without a good reason.
The constructor on the DataFrame class could simply extract this from the call stack. In all honesty, this would probably be true as well for the PySpark classic class.
This side-ways loading of values is really not ideal.
For this, I will revisit and see if I can fix Spark Classic together. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make sure we're keeping track of the follow ups.
There are three followups to make:
Working on them this week. |
Merged to master. |
@@ -89,7 +90,16 @@ class Expression: | |||
""" | |||
|
|||
def __init__(self) -> None: | |||
pass | |||
origin = current_origin() | |||
fragment = origin.fragment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For #46789 (comment): cc @grundprinzip
Ah, okay. Now I remember why we came this way. The problem is that there's no way to get the caller API name within, say, Expression.__init__()
so we should either:
- Manually pass the function name or
- Wrap individual API method (but do not use threadlocal).
The initial implementation of Spark Connect was like this [SPARK-47274][PYTHON][SQL] Provide more useful context for PySpark DataFrame API errors #45377, then I realised that this is actually error-pone, and flaky (e.g., it needed a followup: [SPARK-47852][PYTHON] SupportDataFrameQueryContext
for reverse operations #46053). I pointed it out here: [SPARK-47274][PYTHON][SQL] Provide more useful context for PySpark DataFrame API errors #45377 (comment)
My take was that either 1. or 2. have the same flakiness, and replies on some manual string name so we went this way (in addition to match with Scala implementation).
For #46789 (comment), #47009 (WIP) cc @cloud-fan |
For SPARK-48639: #47024 cc @grundprinzip |
### What changes were proposed in this pull request? This PR proposes to add `Origin` (from #46789) to `Relation.RelationCommon` ### Why are the changes needed? To have the common protobuf message to keep the source code info. ### Does this PR introduce _any_ user-facing change? No. This is not used. ### How was this patch tested? CI should validate protobuf definition, and exiting tests should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47024 from HyukjinKwon/SPARK-48639. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This PR proposes to add `Origin` (from #46789) to `Relation.RelationCommon` To have the common protobuf message to keep the source code info. No. This is not used. CI should validate protobuf definition, and exiting tests should pass. No. Closes #47024 from HyukjinKwon/SPARK-48639. Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org> Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 09cb592) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR proposes to add `Origin` (from #46789) to `Relation.RelationCommon` This is a revert of the revert. ### Why are the changes needed? To have the common protobuf message to keep the source code info. ```diff - // TODO(SPARK-48639): Add origin like Expression.ExpressionCommon - - // (Required) Shared relation metadata. - string source_info = 1; + // (Optional) Shared relation metadata. + reserved 1; ``` This is considered as a breaking change, and we should fix up all other branches down to avoid, which isn't really worthwhile. ### Does this PR introduce _any_ user-facing change? No. This is not used. ### How was this patch tested? CI should validate protobuf definition, and exiting tests should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47115 from HyukjinKwon/SPARK-48639-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR proposes to Implement DataFrameQueryContext in Spark Connect.
Add two new protobuf messages packed together with
Expression
:Merge
DataFrameQueryContext.pysparkFragment
andDataFrameQueryContext.pysparkcallSite
to existingDataFrameQueryContext.fragment
andDataFrameQueryContext.callSite
Separate
QueryContext
intoSQLQueryContext
andDataFrameQueryContext
for consistency w/ Scala sideImplement the origin logic.
current_origin
thread local holds the current call site/the function name, andExpression
gets it from it.They are set to individual expression messages, and are used when analysis happens - this resembles Spark SQL implementation.
See also #45377.
Why are the changes needed?
See #45377
Does this PR introduce any user-facing change?
Yes, same as #45377 but in Spark Connect.
How was this patch tested?
Same unittests reused in Spark Connect.
Was this patch authored or co-authored using generative AI tooling?
No.