Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47363][SS] Initial State without state reader implementation for State API v2. #45467

Closed
wants to merge 28 commits into from

Conversation

jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Mar 11, 2024

What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2.

Note that populating the initial state will only happen for the first batch of the new streaming query. Trying to re-initialize state for the same grouping key will result in an error.

Why are the changes needed?

These changes are needed to support initial state. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

Does this PR introduce any user-facing change?

Yes.
This PR introduces a new function:

def transformWithState(
      statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
      timeoutMode: TimeoutMode,
      outputMode: OutputMode,
      initialState: KeyValueGroupedDataset[K, S]): Dataset[U] 

How was this patch tested?

Unit tests in TransformWithStateWithInitialStateSuite

Was this patch authored or co-authored using generative AI tooling?

No

@jingz-db jingz-db changed the title WIP [SS] Initial State without state reader implementation for State API v2. Mar 11, 2024
@jingz-db jingz-db marked this pull request as ready for review March 12, 2024 01:21
@jingz-db jingz-db changed the title [SS] Initial State without state reader implementation for State API v2. [SS][SPARK-47363] Initial State without state reader implementation for State API v2. Mar 12, 2024
@@ -85,3 +85,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
statefulProcessorHandle
}
}

/**
* Similar usage as StatefulProcessor. Represents the arbitrary stateful logic that needs to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reword this - Stateful processor with support for specifying initial state. Accepts a user-defined type as initial state to be initialized in the first batch. This can be used for starting a new streaming query with existing state from a previous streaming query ?

@@ -665,7 +665,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
outputMode: OutputMode = OutputMode.Append()): Dataset[U] = {
Dataset[U](
sparkSession,
TransformWithState[K, V, U](
// The last K type is only to silence compiler error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to avoid this ?

@anishshri-db
Copy link
Contributor

Error seems relevant on the MIMA checks -

problems with Sql module: 
method transformWithState(org.apache.spark.sql.streaming.StatefulProcessorWithInitialState,org.apache.spark.sql.streaming.TimeoutMode,org.apache.spark.sql.streaming.OutputMode,org.apache.spark.sql.KeyValueGroupedDataset,org.apache.spark.sql.Encoder,org.apache.spark.sql.Encoder)org.apache.spark.sql.Dataset in class org.apache.spark.sql.KeyValueGroupedDataset does not have a correspondent in client version

we probably need to update the Connect variants as well

key: String,
initialState: (String, Double)): Unit = {
val initStateVal = initialState._2
_valState.update(initStateVal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simulate an actual case class for initial state that stores list/map and/or iterator for list values/iterator for map key-values ?

* the query in the first batch.
*
*/
def transformWithState[U: Encoder, S: Encoder](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[sql]

We want to defer exposing the API to public till we complete the work.

@github-actions github-actions bot added the DOCS label Mar 20, 2024
child.execute().mapPartitionsWithStateStore[InternalRow](
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
val hadoopConfBroadcast = sparkContext.broadcast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% percent sure, but this will distribute the read-only variable hadoopConf to all executors - similar as here:

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val hadoopConfBroadcast = session.sparkContext.broadcast(
new SerializableConfiguration(session.sessionState.newHadoopConf()))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is a code comment. The practice seems to be that it's better to use broadcast rather than task serialization as it could be huge.

child.execute().mapPartitionsWithStateStore[InternalRow](
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
val hadoopConfBroadcast =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this was only needed for the batch support part right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet reviewed the test suite, though I guess Anish has reviewed in detail.

child = logicalPlan,
initialState.groupingAttributes,
initialState.dataAttributes,
initialState.queryExecution.logical
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we follow the practice we did in flatMapGroupsWithState for safeness sake?

initialState.queryExecution.analyzed

@@ -268,11 +268,13 @@ class IncrementalExecution(
)

case t: TransformWithStateExec =>
val hasInitialState = (isFirstBatch && t.hasInitialState)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to allow adding state in the middle of the query lifecycle. Here isFirstBatch does not mean batch ID = 0 but mean this is the first batch in this query run.

This should follow the above logic we did for FlatMapGroupsWithStateExec, currentBatchId == 0L.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if this is a different functionality than we had in flatMapGroupsWithState.

child.execute().mapPartitionsWithStateStore[InternalRow](
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
val hadoopConfBroadcast = sparkContext.broadcast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is a code comment. The practice seems to be that it's better to use broadcast rather than task serialization as it could be huge.

processData(store, singleIterator)
}
} else {
// If the query is running in batch mode, we need to create a new StateStore and instantiate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: apply the same practice while we are here? broadcast

useMultipleValuesPerKey = true)
val store = stateStoreProvider.getStore(0)

processDataWithInitialState(store, childDataIterator, initStateIterator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We close the state store and state store provider in batch codepath (see below). Shall we do that here as well?

Also, this is a good representation that we have duplicated code. two batch parts have similarity on spinning up state store provider and state store, and also closing them. That could be extracted out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good advice! Refactored duplicated codes into initNewStateStoreAndProcessData().


// Check if is first batch
// Only process initial states for first batch
if (processorHandle.getQueryInfo().getBatchId == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see we have multiple checks. Though still better to change the condition in IncrementalExecution as reader can misunderstand that there are inconsistency between flatMapGroupsWithState and transformWithState.

@beliefer beliefer changed the title [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [SPARK-47363][SS] Initial State without state reader implementation for State API v2. Mar 27, 2024
@github-actions github-actions bot added the BUILD label Mar 27, 2024
@github-actions github-actions bot removed the BUILD label Mar 27, 2024
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR
Copy link
Contributor

CI failure isn't related - only pyspark-connect failed.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

sweisdb pushed a commit to sweisdb/spark that referenced this pull request Apr 1, 2024
…or State API v2

### What changes were proposed in this pull request?

This PR adds support for users to provide a Dataframe that can be used to instantiate state for the query in the first batch for arbitrary state API v2.

Note that populating the initial state will only happen for the first batch of the new streaming query. Trying to re-initialize state for the same grouping key will result in an error.

### Why are the changes needed?

These changes are needed to support initial state. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

### Does this PR introduce _any_ user-facing change?

Yes.
This PR introduces a new function:
```
def transformWithState(
      statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
      timeoutMode: TimeoutMode,
      outputMode: OutputMode,
      initialState: KeyValueGroupedDataset[K, S]): Dataset[U]
```

### How was this patch tested?

Unit tests in `TransformWithStateWithInitialStateSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45467 from jingz-db/initial-state-state-v2.

Lead-authored-by: jingz-db <jing.zhan@databricks.com>
Co-authored-by: Jing Zhan <135738831+jingz-db@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants