New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider #45038
Conversation
…tely and also for each col family in RocksDB state store provider
@HeartSaVioR - PTAL, thx ! |
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for separating the encoders, this helps us avoid evolve the key/value encoders independently and use both Multi valued, and prefix key encoder.
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minors. Looks great in overall.
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
@@ -215,7 +240,9 @@ private[sql] class RocksDBStateStoreProvider | |||
(keySchema.length > numColsPrefixKey), "The number of columns in the key must be " + | |||
"greater than the number of columns for prefix key!") | |||
|
|||
this.encoder = RocksDBStateEncoder.getEncoder(keySchema, valueSchema, numColsPrefixKey) | |||
keyValueEncoderMap.putIfAbsent(StateStore.DEFAULT_COL_FAMILY_NAME, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Maybe microbenchmark could tell that this could regress for default column family only - map lookup with carefully crafted lock operation in every op, though I'd rather not concern before we see actual regression.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I didn't worry about it too much, given that the provider init likely happens once for long lived queries and where we can retain the use of the same provider on the same executor across m/batch executions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, what I meant is to look up concurrent map per "every op" to figure out encoder, for existing stateful operators - previously it was just a reference to the field. But ops is relatively very cheap compared to commit as of now, so let's see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok - yea mainly didn't want to maintain 2 data structures for this. But if we find that its more expensive, then we can just split some of the logic for the default col family case
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending CI
Thanks! Merging to master. |
What changes were proposed in this pull request?
Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider
Why are the changes needed?
This change allows us to specify encoder for key/values separately and avoid encoding additional bytes. Also, it allows us to set schemas/encoders for individual column families, which will be required for future changes related to transformWithState operator (listState/timer changes etc)
We are refactoring a bit here given the upcoming changes. so we are proposing to split key and value encoders.
Key encoders can be of 2 types:
Value encoders can also eventually be of 2 types:
And we now also allow setting schema and getting encoder for each column family.
So after the change, we can potentially allow something like this:
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests
Was this patch authored or co-authored using generative AI tooling?
No