[SS][WIP] Serialization using case classes/primitives/POJO based on Avro for Arbitrary State API v2.#44989
Closed
jingz-db wants to merge 15 commits intoapache:masterfrom
Closed
[SS][WIP] Serialization using case classes/primitives/POJO based on Avro for Arbitrary State API v2.#44989jingz-db wants to merge 15 commits intoapache:masterfrom
jingz-db wants to merge 15 commits intoapache:masterfrom
Conversation
a648c9e to
50ff41a
Compare
3f9a25f to
ae5673e
Compare
anishshri-db
reviewed
Feb 2, 2024
| * @return - instance of ValueState of type T that can be used to store state persistently | ||
| */ | ||
| def getValueState[T](stateName: String): ValueState[T] | ||
| def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] |
Contributor
There was a problem hiding this comment.
add to function comment
anishshri-db
reviewed
Feb 2, 2024
| } | ||
|
|
||
| private[avro] object AvroFileFormat { | ||
| private[spark] object AvroFileFormat { |
Contributor
There was a problem hiding this comment.
Can it be private[sql] ?
anishshri-db
reviewed
Feb 2, 2024
| keyRow | ||
| } | ||
|
|
||
| def encodeValue[S] (value: S): UnsafeRow = { |
Contributor
There was a problem hiding this comment.
Should we remove this later ?
anishshri-db
reviewed
Feb 2, 2024
| // case class -> dataType | ||
| val valSchema: StructType = valEnc.schema | ||
| // dataType -> avroType | ||
| val avroType: Schema = SchemaConverters.toAvroType(valSchema) |
Contributor
There was a problem hiding this comment.
Convert from spark SQL schema to Avro schema ?
anishshri-db
reviewed
Feb 2, 2024
| new GenericDatumWriter[Any](avroType) | ||
| val avroData = avroSerializer.serialize(objRow) | ||
| writer.write(avroData, encoder) | ||
| encoder.flush() |
Contributor
There was a problem hiding this comment.
Do we need to do writer.close ?
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
In the new operator for arbitrary state-v2, we cannot rely on the session/encoder being available since the initialization for the various state instances happens on the executors. Also, we can only support limited state types with the available encoders. Hence, for the state serialization, we propose to serialize primitives/case classes/POJO into avro bytes.
Why are the changes needed?
These changes are needed for providing a dedicated serializer for state-v2. Leveraging avro can speed up the serialization and comes with native support for schema evolution.
The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939
Does this PR introduce any user-facing change?
TODO: depends on whether we want to ask users for value encoders.
How was this patch tested?
Unit tests for primitives, case classes, POJO separately.
Was this patch authored or co-authored using generative AI tooling?
No.