Skip to content

[SPARK-43032][CONNECT][SS] Add Streaming query manager#40861

Closed
WweiL wants to merge 22 commits intoapache:masterfrom
WweiL:SPARK-43032-streaming-query-manager
Closed

[SPARK-43032][CONNECT][SS] Add Streaming query manager#40861
WweiL wants to merge 22 commits intoapache:masterfrom
WweiL:SPARK-43032-streaming-query-manager

Conversation

@WweiL
Copy link
Contributor

@WweiL WweiL commented Apr 19, 2023

What changes were proposed in this pull request?

Add support of StreamingQueryManager() to CONNECT PYTHON client.

Why are the changes needed?

Now users can use typical streaming query manager method by calling session.streams

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Manual test and unit test

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0.dev0
      /_/

Using Python version 3.9.16 (main, Dec  7 2022 01:11:58)
Client connected to the Spark Connect server at localhost
SparkSession available as 'spark'.
>>> q = spark.readStream.format("rate").load().writeStream.format("memory").queryName("test").start()
23/04/19 23:10:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-913e48b9-26d8-448f-899f-d9f5ae08707d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/19 23:10:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
>>> spark.streams.active
[<pyspark.sql.connect.streaming.query.StreamingQuery object at 0x7f4590400d90>]
>>> q1 = spark.streams.active[0]
>>> q1.id == q.id
True
>>> q1.runId == q.runId
True
>>> q1.runId == q.runId
True
>>> q.name
'test'
>>> q1.name
'test'
>>> q == q1
False
>>> q1.stop()
>>> q
<pyspark.sql.connect.streaming.query.StreamingQuery object at 0x7f4590400b20>
>>> q1
<pyspark.sql.connect.streaming.query.StreamingQuery object at 0x7f4590400ee0>
>>> q.isActive
False

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change...

probably you just do StreamingQueryManagerCommandResult streaming_query_manager_command_result = 11; and do not re-use existing proto number

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. TY i'll change it back
Just wanted to make similar names closer...

@HyukjinKwon HyukjinKwon changed the title [SPARK-43032] streaming query manager [SPARK-43032][CONNECT][SS] Adds Streaming query manager Apr 20, 2023
@WweiL WweiL changed the title [SPARK-43032][CONNECT][SS] Adds Streaming query manager [SPARK-43032][CONNECT][SS] Add Streaming query manager Apr 20, 2023
@WweiL WweiL force-pushed the SPARK-43032-streaming-query-manager branch from f70fc9b to aced778 Compare May 1, 2023 05:07
@WweiL WweiL marked this pull request as ready for review May 1, 2023 17:05
Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM. Left a couple of comments.

string run_id = 2;

// (Optional) The name of this query.
optional string name = 3;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed. Note that query-name is not part of real identity of the query. This can be extra field like response for 'WriteStreamOperation'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I'm also not very sure here. Basically I added this because here:


In the sqm.get(queryID) handler, the query may or may not have a name. But in case it has, if we don't return the name then the client won't have it's name.

We could also maintain a local cache of query names, but that would add more complexity on cleaning up that cache though... What do you think

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return the name, but we don't need to change StreamingQueryInstanceId. We can return both StreamingQueryInstanceId and the name

@WweiL WweiL requested a review from amaliujia May 1, 2023 19:37
@WweiL
Copy link
Contributor Author

WweiL commented May 1, 2023

PTAL! @amaliujia @HyukjinKwon Thank you!

yield {
"streaming_query_manager_command_result": b.streaming_query_manager_command_result
}
cmd_result = b.streaming_query_manager_command_result
Copy link
Contributor Author

@WweiL WweiL May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I don't do this, dev/reformat-python always changes this line to

yield {
    "streaming_query_manager_command_result": b.streaming_query_manager_command_result
}

which would result to python lint error because the second line has length 115 (long indentation before it in the original file)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me.

Copy link

@rangadi rangadi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Suggested one change, but that is ok, we could merge this.

@WweiL
Copy link
Contributor Author

WweiL commented May 2, 2023

Also verified null query names:

>>> q = spark.readStream.format("rate").load().writeStream.format("console").start()
>>> q1 = spark.streams.get(q.id)
>>> q1.name
''
>>> q.name
''
>>> q.name == q1.name
True

@xinrong-meng
Copy link
Member

Merged to master, thanks!

LuciferYang pushed a commit to LuciferYang/spark that referenced this pull request May 10, 2023
### What changes were proposed in this pull request?

Add support of `StreamingQueryManager()` to CONNECT PYTHON client.

### Why are the changes needed?

Now users can use typical streaming query manager method by calling `session.streams`

### Does this PR introduce _any_ user-facing change?

Yes

### How was this patch tested?

Manual test and unit test
```
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0.dev0
      /_/

Using Python version 3.9.16 (main, Dec  7 2022 01:11:58)
Client connected to the Spark Connect server at localhost
SparkSession available as 'spark'.
>>> q = spark.readStream.format("rate").load().writeStream.format("memory").queryName("test").start()
23/04/19 23:10:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-913e48b9-26d8-448f-899f-d9f5ae08707d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/19 23:10:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
>>> spark.streams.active
[<pyspark.sql.connect.streaming.query.StreamingQuery object at 0x7f4590400d90>]
>>> q1 = spark.streams.active[0]
>>> q1.id == q.id
True
>>> q1.runId == q.runId
True
>>> q1.runId == q.runId
True
>>> q.name
'test'
>>> q1.name
'test'
>>> q == q1
False
>>> q1.stop()
>>> q
<pyspark.sql.connect.streaming.query.StreamingQuery object at 0x7f4590400b20>
>>> q1
<pyspark.sql.connect.streaming.query.StreamingQuery object at 0x7f4590400ee0>
>>> q.isActive
False
```

Closes apache#40861 from WweiL/SPARK-43032-streaming-query-manager.

Lead-authored-by: Wei Liu <wei.liu@databricks.com>
Co-authored-by: Wei Liu <z920631580@gmail.com>
Signed-off-by: Xinrong Meng <xinrong@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants

Comments