Skip to content

[SPARK-43803] [SS] [CONNECT] Improve awaitTermination() to handle client disconnects#41318

Closed
pengzhon-db wants to merge 5 commits intoapache:masterfrom
pengzhon-db:await_termination_improvement
Closed

[SPARK-43803] [SS] [CONNECT] Improve awaitTermination() to handle client disconnects#41318
pengzhon-db wants to merge 5 commits intoapache:masterfrom
pengzhon-db:await_termination_improvement

Conversation

@pengzhon-db
Copy link
Contributor

@pengzhon-db pengzhon-db commented May 25, 2023

What changes were proposed in this pull request?

Streaming awaitTermination() is a long running API. Currently, it keeps running on server even if client disconnects. This change periodically checks if client has disconnected. If so, we can stop the operation and release resources.

We use gRPC Context.isCancelled() to determine if client has disconnected and response cannot be returned to the client. Here is the reference of of isCancelled():

Return whether this RPC failed before the server could provide its status back to the client.

This could be because of explicit API cancellation from the client-side or server-side, because of deadline exceeded, network connection reset, HTTP/2 parameter configuration (e.g., max message size, max connection age), etc. It does NOT include failure due to a non-OK status return from the server application's request handler, including Status::CANCELLED.

IsCancelled is always safe to call when using sync or callback API. When using async API, it is only safe to call IsCancelled after the AsyncNotifyWhenDone tag has been delivered. Thread-safe.

Why are the changes needed?

The change improves handling of awaitTermination(). It avoids resource waste of server side.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing unit tests.
Manually tested on local with:

  1. start spark connect
  2. create a streaming query
  3. call query.awaitTermination()
  4. exit() the client to disconnect it
  5. check that an error (RPC context is cancelled when executing awaitTermination()) is logged on server. It proves that awaitTermination() is exited on the server side when client disconnects
>>> query = (
...  spark
...  .readStream
...  .format("rate")
...  .option("numPartitions", "1")
...  .load()
...  .writeStream
...  .format("memory")
...  .queryName("tableName_35")
...  .start()
... )

>>>
>>> query.awaitTermination()
...
>>> exit()

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments. Can we add unit test for this?

@pengzhon-db
Copy link
Contributor Author

Left a few comments. Can we add unit test for this?

What kind of unit test are you referring to? We have existing Spark connect awaittermination unit test. Right now we don't have way to simulate client disconnect from python side. Do you think we should add some unit test for that?

@rangadi
Copy link

rangadi commented Jun 6, 2023

What kind of unit test are you referring to? We have existing Spark connect awaittermination unit test. Right now we don't have way to simulate client disconnect from python side. Do you think we should add some unit test for that?

We could disconnect in the test (since we have access to client). Lets may be that could be part of PR for client side changes.

@pengzhon-db
Copy link
Contributor Author

@HyukjinKwon Could u review this PR?

@HyukjinKwon
Copy link
Member

Is this good to go @rangadi ?

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon LGTM. will check CI failure.

@rangadi
Copy link

rangadi commented Jun 8, 2023

@pengzhon-db please fix scala-format error.

@pengzhon-db
Copy link
Contributor Author

@HyukjinKwon can you help merge this? Thanks

@HyukjinKwon
Copy link
Member

Merged to master.

czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
…t disconnects

### What changes were proposed in this pull request?

Streaming awaitTermination() is a long running API. Currently, it keeps running on server even if client disconnects. This change periodically checks if client has disconnected. If so, we can stop the operation and release resources.

We use gRPC Context.isCancelled() to determine if client has disconnected and response cannot be returned to the client. [Here is the reference of of isCancelled()](https://grpc.github.io/grpc/cpp/classgrpc_1_1_server_context.html#af2d0f087805b4b475d01b12d73508f09):

> Return whether this RPC failed before the server could provide its status back to the client.

> This could be because of explicit API cancellation from the client-side or server-side, because of deadline exceeded, network connection reset, HTTP/2 parameter configuration (e.g., max message size, max connection age), etc. It does NOT include failure due to a non-OK status return from the server application's request handler, including [Status::CANCELLED](https://grpc.github.io/grpc/cpp/classgrpc_1_1_status.html#a9994ffe95a0495915d82481c2ec594ab).

> IsCancelled is always safe to call when using sync or callback API. When using async API, it is only safe to call IsCancelled after the AsyncNotifyWhenDone tag has been delivered. Thread-safe.

### Why are the changes needed?

The change improves handling of awaitTermination(). It avoids resource waste of server side.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests.
Manually tested on local with:
1. start spark connect
2. create a streaming query
3. call query.awaitTermination()
4. exit() the client to disconnect it
5. check that an error (RPC context is cancelled when executing awaitTermination()) is logged on server. It proves that awaitTermination() is exited on the server side when client disconnects

```
>>> query = (
...  spark
...  .readStream
...  .format("rate")
...  .option("numPartitions", "1")
...  .load()
...  .writeStream
...  .format("memory")
...  .queryName("tableName_35")
...  .start()
... )

>>>
>>> query.awaitTermination()
...
>>> exit()
```

Closes apache#41318 from pengzhon-db/await_termination_improvement.

Authored-by: pengzhon-db <peng.zhong@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants