Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42960] [CONNECT] [SS] Add await_termination() and exception() API for Streaming Query in Python #40785

Closed
wants to merge 39 commits into from

Conversation

WweiL
Copy link
Contributor

@WweiL WweiL commented Apr 13, 2023

What changes were proposed in this pull request?

Add the await_termination() and exception() to streaming query class.

For exception, only pass the message the same way in SparkConnectService, and construct the error the same way as the convert_exception method in _handle_rpc_error in client.py.

For await_termination, send the command multiple times instead of waiting to prevent RPC timeout. <-- I'm definitely open to any discussion on its implementation!

Why are the changes needed?

Add missing APIs.

Does this PR introduce any user-facing change?

Yes but part of ongoing developing of Streaming Spark Connect.

How was this patch tested?

Existing unit tests. Note that the unit tests for them are still skipped because of 1. queryManager is not implemented. 2. Allow access to stopped query is not implemented.

I was able to test them manually by

  1. For test_stream_await_termination(), comment out the
for q in self.spark.streams.active:
    q.stop()
  1. For test_stream_exception(), comment out unregistering terminated query:
    https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L411

@WweiL
Copy link
Contributor Author

WweiL commented Apr 13, 2023

@HyukjinKwon @rangadi @pengzhon-db

Hey guys, PTAL when you get a chance, thanks!

@@ -298,6 +306,16 @@ message StreamingQueryCommandResult {
// Logical and physical plans as string
string result = 1;
}

message ExceptionResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need at least the stacktrace. Leave a comment about what our thinking is there.

Copy link
Contributor Author

@WweiL WweiL Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, currently the support for error is limited.

I tracked how they handle errors, and found that it's through here:

raise convert_exception(info, status.message) from None

And then in the convert_exception method:

elif "org.apache.spark.sql.streaming.StreamingQueryException" in classes:
return StreamingQueryException(message)

Only the message is directly passed.

I guess we could file a ticket to wait until batch side's change, and then we could align with them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try to be consistent with what current exception() returns, which is return CapturedStreamingQueryException(msg, stackTrace, je.getCause()) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but satckTrace and cause are not included in connect's error framework so far. There is an ongoing PR about this: #40575.

python/pyspark/sql/connect/streaming/query.py Outdated Show resolved Hide resolved
@@ -196,7 +196,7 @@ def awaitTermination(self, timeout: Optional[int] = None) -> Optional[bool]:
>>> sq.stop()
"""
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout < 0:
if not isinstance(timeout, (int, float)) or timeout <= 0:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WweiL
Copy link
Contributor Author

WweiL commented Apr 18, 2023

@HyukjinKwon Can you take a look? Thanks!

@HyukjinKwon
Copy link
Member

@WweiL mind rebasing this one please?

@WweiL
Copy link
Contributor Author

WweiL commented Apr 19, 2023

Fetched and Merged with master. I noticed stack trace was added. I'll create a SPARK ticket to include it

respBuilder.getAwaitTerminationBuilder
.setTerminated(terminated)
} else {
query.awaitTermination()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm .. just to be extra clear, it will be disconnected when it reaches gRPC timeout .. am i correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this is intended at this stage. @rangadi will push update regarding this I believe

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client keep sending heart beat message to keep the RPC connection alive.
That said, we would still need to improve handling of this. E.g. it should exit if client side disconnects.

@HyukjinKwon
Copy link
Member

@hvanhovell or @grundprinzip actually mind taking a look please when you find some time?

@HyukjinKwon
Copy link
Member

I see .. sorry for a bit of back and forth:

python/pyspark/sql/connect/streaming/query.py:72: error: Missing return statement  [return]
python/pyspark/sql/connect/streaming/query.py:108: error: Missing return statement  [return]
python/pyspark/sql/connect/streaming/query.py:140: error: Missing return statement  [return]

Let's add them back :-) ..

@WweiL WweiL changed the title [SPARK-42960] Add await_termination() and exception() API for Streaming Query [SPARK-42960] [CONNECT] [SS] Add await_termination() and exception() API for Streaming Query Apr 20, 2023
@WweiL WweiL changed the title [SPARK-42960] [CONNECT] [SS] Add await_termination() and exception() API for Streaming Query [SPARK-42960] [CONNECT] [SS] Add await_termination() and exception() API for Streaming Query in Python Apr 20, 2023
@WweiL
Copy link
Contributor Author

WweiL commented Apr 20, 2023

@HyukjinKwon Can you merge this : ) Thanks!

@HyukjinKwon
Copy link
Member

Merged to master.

HyukjinKwon added a commit that referenced this pull request Jan 13, 2024
…logics out

### What changes were proposed in this pull request?

This PR factor Connect/non-Connect specific logics out into dedicated test classes. This PR is a followup of #40785

### Why are the changes needed?

In order to avoid test failure such as #44698 (comment) by missing dependencies

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

CI in this PR should verify it.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44715 from HyukjinKwon/SPARK-42960-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants