Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SS][SPARK-47331] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2. #45447

Closed
wants to merge 5 commits into from

Conversation

jingz-db
Copy link
Contributor

@jingz-db jingz-db commented Mar 8, 2024

What changes were proposed in this pull request?

In the new operator for arbitrary state-v2, we cannot rely on the session/encoder being available since the initialization for the various state instances happens on the executors. Hence, for the state serialization, we propose to let user explicitly pass in encoder for state variable and serialize primitives/case classes/POJO with SQL encoder. Leveraging SQL encoder can speed up the serialization.

Why are the changes needed?

These changes are needed for providing a dedicated serializer for state-v2.
The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

Does this PR introduce any user-facing change?

Users will need to specify the SQL encoder for their state variable:
def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T]
def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T]

For primitive type, Encoder is something as: Encoders.scalaLong; for case class, Encoders.product[CaseClass]; for POJO, Encoders.bean(classOf[POJOClass])

How was this patch tested?

Unit tests for primitives, case classes, POJO separately in ValueStateSuite

Was this patch authored or co-authored using generative AI tooling?

No

@jingz-db jingz-db changed the title SQL encoder [SS] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2. Mar 8, 2024
@jingz-db jingz-db marked this pull request as ready for review March 8, 2024 23:31
@jingz-db jingz-db changed the title [SS] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2. [SS][SPARK-47331] Serialization using case classes/primitives/POJO based on SQL encoder for Arbitrary State API v2. Mar 8, 2024
* @param stateName - name of logical state partition
* @tparam GK - grouping key type
* @tparam S - value type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could we rename type param to V instead ?

}

public POJOTestClass(String name, int id) {
this.name = name;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent/spacing seems off ?

@@ -93,7 +96,7 @@ class ValueStateSuite extends SharedSparkSession
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])

val stateName = "testState"
val testState: ValueState[Long] = handle.getValueState[Long]("testState")
val testState: ValueState[Long] = handle.getValueState[Long]("testState", Encoders.scalaLong)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test for other common types such as Double/String etc ?

}

// Getter methods
public String getName() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a GH thing - but this still seems a little off to me ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a weird style correction happening without me realizing it. Fixed now!

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm ! thx

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Btw, it might be better to have user facing two APIs for scala-friendly vs java-friendly. See flatMapGroupsWithState methods - providing encoder explicitly is only required to java-friendly API and for scala-friendly API people can use implicit. This could be done as another follow-up JIRA ticket.

@HeartSaVioR
Copy link
Contributor

GA failure happened only in Pyspark connect, and these failed suites do not have relationship with this change.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

jpcorreia99 pushed a commit to jpcorreia99/spark that referenced this pull request Mar 12, 2024
…sed on SQL encoder for Arbitrary State API v2

### What changes were proposed in this pull request?

In the new operator for arbitrary state-v2, we cannot rely on the session/encoder being available since the initialization for the various state instances happens on the executors. Hence, for the state serialization, we propose to let user explicitly pass in encoder for state variable and serialize primitives/case classes/POJO with SQL encoder. Leveraging SQL encoder can speed up the serialization.

### Why are the changes needed?

These changes are needed for providing a dedicated serializer for state-v2.
The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

### Does this PR introduce _any_ user-facing change?

Users will need to specify the SQL encoder for their state variable:
`def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T]`
`def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T]`

For primitive type, Encoder is something as: `Encoders.scalaLong`; for case class, `Encoders.product[CaseClass]`; for POJO, `Encoders.bean(classOf[POJOClass])`

### How was this patch tested?

Unit tests for primitives, case classes, POJO separately in `ValueStateSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45447 from jingz-db/sql-encoder-state-v2.

Authored-by: jingz-db <jing.zhan@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@jingz-db
Copy link
Contributor Author

+1

Btw, it might be better to have user facing two APIs for scala-friendly vs java-friendly. See flatMapGroupsWithState methods - providing encoder explicitly is only required to java-friendly API and for scala-friendly API people can use implicit. This could be done as another follow-up JIRA ticket.

Created Task here: https://issues.apache.org/jira/browse/SPARK-47403

sweisdb pushed a commit to sweisdb/spark that referenced this pull request Apr 1, 2024
…sed on SQL encoder for Arbitrary State API v2

### What changes were proposed in this pull request?

In the new operator for arbitrary state-v2, we cannot rely on the session/encoder being available since the initialization for the various state instances happens on the executors. Hence, for the state serialization, we propose to let user explicitly pass in encoder for state variable and serialize primitives/case classes/POJO with SQL encoder. Leveraging SQL encoder can speed up the serialization.

### Why are the changes needed?

These changes are needed for providing a dedicated serializer for state-v2.
The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

### Does this PR introduce _any_ user-facing change?

Users will need to specify the SQL encoder for their state variable:
`def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T]`
`def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T]`

For primitive type, Encoder is something as: `Encoders.scalaLong`; for case class, `Encoders.product[CaseClass]`; for POJO, `Encoders.bean(classOf[POJOClass])`

### How was this patch tested?

Unit tests for primitives, case classes, POJO separately in `ValueStateSuite`

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45447 from jingz-db/sql-encoder-state-v2.

Authored-by: jingz-db <jing.zhan@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants