Skip to content

Comments

[Draft] Streaming Spark Connect POC#40373

Closed
rangadi wants to merge 10 commits intoapache:masterfrom
rangadi:ssc-poc
Closed

[Draft] Streaming Spark Connect POC#40373
rangadi wants to merge 10 commits intoapache:masterfrom
rangadi:ssc-poc

Conversation

@rangadi
Copy link

@rangadi rangadi commented Mar 10, 2023

[This is not meant for merge, but a preliminary POC for streaming support in Spark Connect].

This includes basic functionality to run streaming queries over spark connect.
Expectation is 1:1 parity with standard streaming API.

How to try it in local mode ( ./bin/pyspark --remote "local[*]")

>>> 
>>> query = ( 
...   spark
...     .readStream
...     .format("rate")
...     .option("numPartitions", "1")
...     .load()
...     .withWatermark("timestamp", "1 minute")
...     .groupBy(window("timestamp", "10 seconds"))
...     .count() # count for each 10 sedonds.
...     .writeStream
...     .format("memory")
...     .queryName("rate_table")
...     .trigger(processingTime="10 seconds")
...     .start()
... )
>>>
>>> query.isActive
True
>>> 
>>> >>> spark.sql("select window.start, count from rate_table").show()
+-------------------+-----+
|              start|count|
+-------------------+-----+
|2023-03-11 22:45:40|    6|
|2023-03-11 22:45:50|   10|
+-------------------+-----+
>>> 
>>> # Query Status
>>> print(json.dumps(query.status, indent=4))
{
    "message": "Waiting for next trigger",
    "isDataAvailable": true,
    "isTriggerActive": false
}

>>> # Streaming Progress (lastProgress)
>>> print(json.dumps(query.lastProgress, indent=4))
{
    "id": "88064cec-3418-4ad3-90aa-17043458f540",
    "runId": "24ff7f44-7f4b-424a-b632-dad62c6c92dd",
    "name": "rate_table",
    "timestamp": "2023-03-12T06:48:40.006Z",
    "batchId": 18,
    "numInputRows": 10,
    "inputRowsPerSecond": 0.9997000899730081,
    "processedRowsPerSecond": 33.670033670033675,
    "durationMs": {
        "addBatch": 109,
        "commitOffsets": 85,
        "getBatch": 0,
        "latestOffset": 0,
        "queryPlanning": 5,
        "triggerExecution": 297,
        "walCommit": 98
    },
    "eventTime": {
        "avg": "2023-03-12T06:48:34.306Z",
        "max": "2023-03-12T06:48:38.806Z",
        "min": "2023-03-12T06:48:29.806Z",
        "watermark": "2023-03-12T06:47:28.806Z"
    },
    "stateOperators": [
        {
            "operatorName": "stateStoreSave",
            "numRowsTotal": 8,
            "numRowsUpdated": 2,
     [...]

>>> query.stop()

Next steps:

We need add remaining features :

  • Scala API
  • Streaming listener
  • Unimplemented / missing APIs. e.g.
    • All the triggers
    • to_table()
    • text() etc.
  • StreamingQueryManager API
  • tests, including running existing tests with spark-connect
  • and some more.

@rangadi
Copy link
Author

rangadi commented Mar 10, 2023

cc: @HeartSaVioR

@rangadi rangadi closed this Apr 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant