Skip to content

Commit

Permalink
few nits
Browse files Browse the repository at this point in the history
  • Loading branch information
jingz-db committed Mar 5, 2024
1 parent aa3fcc1 commit 73821e7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ private[sql] trait StatefulProcessorHandle extends Serializable {
/**
* Creates new or returns existing map state associated with stateName.
* The MapState persists Key-Value pairs of type [K, V].
*
* @param stateName - name of the state variable
* @param userKeyEnc - spark sql encoder for the map key
* @tparam K - type of key for map state variable
* @tparam V - type of value for map state variable
* @return - instance of MapState of type [K,V] that can be used to store state persistently
*/
def getMapState[K, V](stateName: String, userKeyEnc: Encoder[K]): MapState[K, V]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class MapStateImpl[K, V](
/** Get the state value if it exists */
override def getValue(key: K): V = {
// TODO do we want to reuse this function,
// or create a new error for user key?
// or create a new error for null user key?
StateStoreErrors.requireNonNullStateValue(key, stateName)
val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
val unsafeRowValue = store.get(encodedCompositeKey, stateName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class TransformWithMapStateSuite extends StreamTest {

testStream(result, OutputMode.Update())(
AddData(inputData, InputMapRow("k1", "getValue", ("v1", ""))),
CheckAnswer(("k1", "v1", "null"))
CheckAnswer(("k1", "v1", null))
)
}
}
Expand Down

0 comments on commit 73821e7

Please sign in to comment.