Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47272][SS] Add MapState implementation for State API v2. #45341

Closed
wants to merge 17 commits into from

Conversation

jingz-db
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds changes for MapState implementation in State Api v2. This implementation adds a new encoder/decoder to encode grouping key and user key into a composite key to be put into RocksDB so that we could retrieve key-value pair by user specified user key by one rocksdb get.

Why are the changes needed?

These changes are needed to support map values in the State Store. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

Does this PR introduce any user-facing change?

Yes
This PR introduces a new state type (MapState) that users can use in their Spark streaming queries.

How was this patch tested?

Unit tests in TransforWithMapStateSuite.

Was this patch authored or co-authored using generative AI tooling?

No

@jingz-db jingz-db marked this pull request as ready for review March 4, 2024 19:06
@jingz-db
Copy link
Contributor Author

jingz-db commented Mar 4, 2024

Thanks Eric for reviews on my old PR. I've resolved them and incorporated in this one already.

@jingz-db jingz-db changed the title [SS] Add MapState implementation for State API v2. [SPARK-47272][SS] Add MapState implementation for State API v2. Mar 4, 2024
// get grouping key byte array
val keyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
// get user key byte array
val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse this instead of creating new one each time ?

require(key != null, "User key cannot be null.")
val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key, userKeyExprEnc)
val unsafeRowValue = store.get(encodedCompositeKey, stateName)
if (unsafeRowValue == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, I think we can return null here similar to other state types

null.asInstanceOf[V]

}
val groupingKey = keyOption.get.asInstanceOf[GK]
// generate grouping key byte array
val groupingKeyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you can just directly call keyOption.get here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiler will complain in the next line:
keySerializer.apply(groupingKey) where groupingKey will be of Any type if we directly call keyOption.get

}
}

object CompositeKeyStateEncoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - do we really need these singleton objects ? Could we just call new CompositeKeyStateEncoder instead ?
cc - @HeartSaVioR - is this a preferred pattern for Spark ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following Bhuwan's style in the base class. Maybe I am missing something but did not find anything useful in the style guide.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the list of parameters are exactly the same with default constructor, let's just use new.

useColumnFamilies = useColumnFamilies)
}

protected def newStoreProviderWithValueState(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just rename this function to be generic ? i guess anyway the specific state variable schema is being managed by the individual type information right ?

import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import org.apache.spark.sql.internal.SQLConf

case class InputMapRow(key: String, action: String, value: (String, String))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a class level comment describing what this test suite does ?

}
}

test("Test put value with null value") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for batch query using mapState ?

@anishshri-db
Copy link
Contributor

@jingz-db - test failure seems related ?

@jingz-db
Copy link
Contributor Author

jingz-db commented Mar 6, 2024

@jingz-db - test failure seems related ?

Weirdly is passing locally. Let me resolve your comments and retrigger the CI and see if it still fails. Thanks for the review!

@anishshri-db
Copy link
Contributor

@jingz-db - could you please fix this style error ?

[info] compiling 1 Scala source to /home/runner/work/spark/spark/tools/target/scala-2.13/classes ...
[error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:195:0: Whitespace at end of line
[info]   Compilation completed in 14.316s.

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ! pending CI completion with success

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good. Left some comments.

* @tparam V - type of value for map state variable
* @return - instance of MapState of type [K,V] that can be used to store state persistently
*/
def getMapState[K, V](stateName: String, userKeyEnc: Encoder[K]): MapState[K, V]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a friendly reminder: I expect value encoder will be in the parameter as well once this is rebased.

store.prefixScan(encodedGroupingKey, stateName)
.map {
case iter: UnsafeRowPair =>
(stateTypesEncoder.decodeCompositeKey(iter.key),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: As you will rebase with #45447, UnsafeProjection will reuse the row instance, so we can't store the row persistently unless copying it. If we do copy, we probably want to reduce the scope for key-value vs key vs value.

Maybe good to have a private method decoding key and value in iterator but not creating a map. Each method can get the iterator from the new method, and pick key / value / both, and then copy rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the input! Not sure if I understand you correctly, are you trying to say that: we want to returnIterator instead of Map to reduce the copy, and we need to use different reused rows for key/value in StateTypesEncoder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @anishshri-db, need your input on this: Do we want to return Map type or Iterator type for getMap function?
Talked with Jungtaek on Slack, if we decide to return Map type, we'll probably need to materialize the map and copy everything in map into memory (because we reuse UnsafeRow in StateTypeEncoder). So Jungtaek is concerning about the case where we have a large map. I also feel like returning Iterator type makes more sense, because for ListState we also return Iterator for get list function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea sure - lets use the iterator approach. Thx

}
}

object CompositeKeyStateEncoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the list of parameters are exactly the same with default constructor, let's just use new.

testStream(result, OutputMode.Update())(
AddData(inputData, inputMapRow),
ExpectFailure[SparkIllegalArgumentException](e => {
assert(e.getMessage.contains("ILLEGAL_STATE_STORE_VALUE.NULL_VALUE"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we verify the exception with checkError?

@@ -67,6 +67,7 @@ class MapStateSuite extends StateVariableSuiteBase {
assert(!testState.exists())
assert(testState.getMap().hasNext === false)
}
ImplicitGroupingKeyTracker.removeImplicitKey()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just do this in setup (before) or teardown (after) in StateVariableSuiteBase, to ensure the cleanup is guaranteed to be done - as failure in test A won't clean up the thread local and introduce another failure. We do like to avoid cascading failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Thanks Jungtaek!

}.toMap
val pairsIterator = store.prefixScan(encodedGroupingKey, stateName)

new Iterator[(K, V)] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe final nit: your previous code just works and it seems much simpler. We just need to remove the call .toMap and done.

Also maybe the method name to remove the map from the name?

  • getIterator - consistent with getKeys/getValues
  • iterator - consistent with Map collection. need to change other methods as well, e.g. getKeys to keys, getValues to values
  • get() - consistent with other type of state. The value of origin type is retrieved with get() consistently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to defer the change of method name in follow up PR if we want to have a time to figure out the best name. The first one is something good to do this before merging the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jungtaek! As discussed, I changed getMap() name to iterator and pair it with getKeys(), getValues() to keys() and values() accordingly.

@jingz-db
Copy link
Contributor Author

Test suite failure seems unrelated in pyspark.

@HeartSaVioR
Copy link
Contributor

Yeah, doesn't look to be related. I'm ignoring the CI error.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

sweisdb pushed a commit to sweisdb/spark that referenced this pull request Apr 1, 2024
### What changes were proposed in this pull request?

This PR adds changes for MapState implementation in State Api v2. This implementation adds a new encoder/decoder to encode grouping key and user key into a composite key to be put into RocksDB so that we could retrieve key-value pair by user specified user key by one rocksdb get.

### Why are the changes needed?

These changes are needed to support map values in the State Store. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939

### Does this PR introduce _any_ user-facing change?

Yes
This PR introduces a new state type (MapState) that users can use in their Spark streaming queries.

### How was this patch tested?

Unit tests in `TransforWithMapStateSuite`.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45341 from jingz-db/map-state-impl.

Lead-authored-by: jingz-db <jing.zhan@databricks.com>
Co-authored-by: Jing Zhan <135738831+jingz-db@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants