Skip to content

Commit

Permalink
[SPARK-35659][SS] Avoid write null to StateStore
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This patch removes the usage of putting null into StateStore.

### Why are the changes needed?

According to `get` method doc in `StateStore` API, it returns non-null row if the key exists. So basically we should avoid write null to `StateStore`. You cannot distinguish if the returned null row is because the key doesn't exist, or the value is actually null. And due to the defined behavior of `get`, it is quite easy to cause NPE error if the caller doesn't expect to get a null if the caller believes the key exists.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added test.

Closes #32796 from viirya/fix-ss-joinstatemanager.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(cherry picked from commit 1226b9b)
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
viirya committed Jun 8, 2021
1 parent 9288649 commit 7432057
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
Expand Up @@ -122,6 +122,7 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
}

override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
require(value != null, "Cannot put a null value")
verify(state == UPDATING, "Cannot put after already committed or aborted")
val keyCopy = key.copy()
val valueCopy = value.copy()
Expand Down
Expand Up @@ -99,8 +99,8 @@ trait ReadStateStore {
trait StateStore extends ReadStateStore {

/**
* Put a new value for a non-null key. Implementations must be aware that the UnsafeRows in
* the params can be reused, and must make copies of the data as needed for persistence.
* Put a new non-null value for a non-null key. Implementations must be aware that the UnsafeRows
* in the params can be reused, and must make copies of the data as needed for persistence.
*/
def put(key: UnsafeRow, value: UnsafeRow): Unit

Expand Down
Expand Up @@ -269,18 +269,14 @@ class SymmetricHashJoinStateManager(
// The backing store is arraylike - we as the caller are responsible for filling back in
// any hole. So we swap the last element into the hole and decrement numValues to shorten.
// clean
if (numValues > 1) {
if (index != numValues - 1) {
val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, numValues - 1)
if (valuePairAtMaxIndex != null) {
keyWithIndexToValue.put(currentKey, index, valuePairAtMaxIndex.value,
valuePairAtMaxIndex.matched)
} else {
keyWithIndexToValue.put(currentKey, index, null, false)
}
keyWithIndexToValue.remove(currentKey, numValues - 1)
} else {
keyWithIndexToValue.remove(currentKey, 0)
}
keyWithIndexToValue.remove(currentKey, numValues - 1)
numValues -= 1
valueRemoved = true

Expand Down
Expand Up @@ -1012,6 +1012,19 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(combinedMetrics.customMetrics(customTimingMetric) == 400L)
}

test("SPARK-35659: StateStore.put cannot put null value") {
val provider = newStoreProvider()

// Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty)

val store = provider.getStore(0)
val err = intercept[IllegalArgumentException] {
store.put(stringToRow("key"), null)
}
assert(err.getMessage.contains("Cannot put a null value"))
}

/** Return a new provider with a random id */
def newStoreProvider(): ProviderClass

Expand Down

0 comments on commit 7432057

Please sign in to comment.