Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12396 added a nullcheck before trying to retrieve a key #10548

Merged
merged 10 commits into from Apr 30, 2021

Conversation

Nathan22177
Copy link
Contributor

No description provided.

@Vasilisck
Copy link

LGTM

@mjsax mjsax added the streams label Apr 16, 2021
@@ -326,6 +326,7 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {

@Override
public synchronized byte[] get(final Bytes key) {
Objects.requireNonNull(key, "key cannot be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add this to MeteredKeyValueStore instead -- this way, we get the same guard for the in-memory key-value store (or other custom stores).

We should also add this check to other methods that accept a key, like put, putIfAbsent etc...

Furthermore, we should also add it for other store types, ie, MeteredTimestampedKeyValueStore, MeteredWindowedStore, MeteredTimestampedWindowStore and MeteredSessionStore.

Last, but not least, we should add corresponding unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be remove, as we do the check in the "metered" store that will wrap this one.

…test coverage for aforementioned nullchecks
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few methods are missing:

MeteredKeyValueStore:

  • putAll() (we should iterator over all entries to check if key might be null
  • prefixScan() (prefix should not be null either I guess? \cc @guozhangwang @cadonna
  • range() (both keys)
  • reverseRange (both keys)

MeteredSessionStore:

  • put() (should verify sessionKey != null and sessionKey.key() != null)
  • remove()
  • fetchSession()
  • fetch(K)
  • backwardFetch(K)
  • fetch(K, K)
  • backwardFetch(K, K)
  • findSession(K, long, long)
  • backwardFindSession
  • findSession(K, K, long, long)
  • backwardFindSession(K, K, long, long)

Thanks for adding tests!

@@ -290,6 +295,7 @@ protected V outerValue(final byte[] value) {
}

protected Bytes keyBytes(final K key) {
Objects.requireNonNull(key, "key cannot be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not public so no need to add this check

@@ -59,6 +61,7 @@


public RawAndDeserializedValue<V> getWithBinary(final K key) {
Objects.requireNonNull(key, "key cannot be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not public API -- don't need this check.

@@ -73,6 +76,7 @@
public boolean putIfDifferentValues(final K key,
final ValueAndTimestamp<V> newValue,
final byte[] oldSerializedValue) {
Objects.requireNonNull(key, "key cannot be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not public API -- don't need this check.

@@ -326,6 +326,7 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {

@Override
public synchronized byte[] get(final Bytes key) {
Objects.requireNonNull(key, "key cannot be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check can be remove, as we do the check in the "metered" store that will wrap this one.

…MeteredTimestampedKeyValueStore, added new checks to MeteredKeyValueStore, covered new checks and old checks for MeteredSessionStoreTest
@Nathan22177
Copy link
Contributor Author

@mjsax

MeteredSessionStore:

  • put() (should verify sessionKey != null and sessionKey.key() != null)
  • remove()
  • fetchSession()
  • fetch(K)
  • backwardFetch(K)
  • fetch(K, K)
  • backwardFetch(K, K)
  • findSession(K, long, long)
  • backwardFindSession
  • findSession(K, K, long, long)
  • backwardFindSession(K, K, long, long)

All of them already had the checks. Some lacked test coverage, though. I made changes according to your comments and added some tests.

@mjsax
Copy link
Member

mjsax commented Apr 21, 2021

All of them already had the checks.

Sweet. I did not double check the code before.

Seems, put(final Windowed<K> sessionKey,...) and remove(final Windowed<K> sessionKey) don't check for sessionKey.key() != null though? Can we add this check?

@cadonna
Copy link
Contributor

cadonna commented Apr 22, 2021

* prefixScan() (prefix should not be `null` either I guess? \cc @guozhangwang @cadonna

Yes! Actually, the implementations in RocksDBStore and InMemoryKeyValueStore already have a null check.

   public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
                                                                                    final PS prefixKeySerializer) {
        Objects.requireNonNull(prefix, "prefix cannot be null");
        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");

I think it would make sense to move them to the MeteredKeyValueStore.

@Nathan22177
Copy link
Contributor Author

Nathan22177 commented Apr 22, 2021

Seems, put(final Windowed<K> sessionKey,...) and remove(final Windowed<K> sessionKey) don't check for sessionKey.key() != null though? Can we add this check?

They do? Since 2017 according to annotations? Might be that we're talking different classes?
image

@Nathan22177
Copy link
Contributor Author

And both are covered with tests.
image

@Nathan22177
Copy link
Contributor Author

@mjsax
Copy link
Member

mjsax commented Apr 22, 2021

They do? Since 2017 according to annotations? Might be that we're talking different classes?

The input parameter is Windowed<K> sessionKey and the check is if sessionKey != null -- however, sessionKey wraps a key object, and my ask was to also check if the wrapped key is not null, ie, to add sessionKey.key() != null check (note the .get() call that extract the wrapped key from sessionKey parameter). Sorry for the confusion.

@Nathan22177
Copy link
Contributor Author

Nathan22177 commented Apr 22, 2021

Ooohhhh. I see. Yea, sorry, I also completely overlooked the wrapped key part. I'll fix it.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for staying on top of this! Seems we find more corner case while reviewing... A few last comments.

@@ -219,11 +224,19 @@ public V putIfAbsent(final K key,

@Override
public void putAll(final List<KeyValue<K, V>> entries) {
final List<KeyValue<K, V>> possiblyNullKeys = entries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could simplify this to a one liner?

entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot be null"));

@@ -234,13 +247,15 @@ public V delete(final K key) {

@Override
public <PS extends Serializer<P>, P> KeyValueIterator<K, V> prefixScan(final P prefix, final PS prefixKeySerializer) {

Objects.requireNonNull(prefix, "key cannot be null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned by @cadonna the wrapped stores, also check prefixKeySerializer for null -- thus might be good to move both check here.

I think we can also remove both checks in RocksDBStore and InMemoryKeyValueStore -- they seems to be redundant now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also remove both checks in RocksDBStore and InMemoryKeyValueStore -- they seem to be redundant now?

they are both different implementations, aren't they?
image
I don't understand how they will be checked if we only leave it in MeteredKeyValueStore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
I wrote a quick test to see if it still throws NPE in InMemoryKeyValueStore without the check - it did, I am confused, but they are, indeed, redundant.
I'll leave the tests in both RocksDBStoreTest and InMemoryKeyValueStore bc why not.

Copy link
Member

@mjsax mjsax Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaStreams runtime always "wraps" any store with a corresponding MeteredXxxStore (cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java#L42-L49) -- those MeteredXxxStores do the transaction from objects to bytes (ie they use the serdes) and also track state store metrics. (Note that stores provided to the runtime always have type <Bytes, byte[]> while they are exposed to Processors as <K,V> types.)

Thus, when you call context.stateStore(...) you always get a MeteredXxxStore object -- of course, those details are hidden behind the interface type.

This architecture allows us to unify code and separate concerns. In fact, it also allows us to add/remove more "layers": we can insert a "caching layer" (cf. https://kafka.apache.org/28/documentation/streams/developer-guide/memory-mgmt.html) and a "change logging layer" (both are inserted by default).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooooh, I see.

@@ -472,11 +472,26 @@ public void shouldThrowNullPointerOnRemoveIfKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.remove(null));
}

@Test
public void shouldThrowNullPointerOnPutIfWrappedKeyIsNull() {
assertThrows(NullPointerException.class, () -> store.put(new Windowed<>(null, new SessionWindow(0, 0)), "a"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test remind me, that the SessionWindow what is wrapped should not be null either.

nathan22177 added 2 commits April 23, 2021 01:50
@mjsax
Copy link
Member

mjsax commented Apr 25, 2021

Seems there is some checkstyle error:

[2021-04-22T23:02:56.789Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-10548/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java:35:8: Unused import - java.util.Objects. [UnusedImports]

@@ -402,6 +402,11 @@ public void shouldReturnKeysWithGivenPrefix() {
assertThat(valuesWithPrefix.get(2), is("b"));
}

@Test
public void shouldThrowNullPointerIfPrefixKeySerializerIsNull() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails now, as the check was removed. No need to add this test any longer in this class.

@mjsax
Copy link
Member

mjsax commented Apr 26, 2021

To avoid checkstyle and test issues, and reduce review turn-around time, I would recommend to run unit tests and checkstyle locally before pushing update: ./gradlew clean stream:unitTest streams:checkstyleMain streams:checkstyleTest

@Nathan22177
Copy link
Contributor Author

To avoid checkstyle and test issues, and reduce review turn-around time, I would recommend to run unit tests and checkstyle locally before pushing update: ./gradlew clean stream:unitTest streams:checkstyleMain streams:checkstyleTest

Yeah, sorry, I forget to do that sometimes.

@mjsax mjsax merged commit e454bec into apache:trunk Apr 30, 2021
@mjsax
Copy link
Member

mjsax commented Apr 30, 2021

Thanks for the PR @Nathan22177! Merged to trunk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants