Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5047: NullPointerException while using GlobalKTable in KafkaStreams #2834

Closed
wants to merge 5 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Apr 10, 2017

Skip null keys when initializing GlobalKTables. This is inline with what happens during normal processing.

@dguy
Copy link
Contributor Author

dguy commented Apr 10, 2017

@mjsax @enothereska @miguno

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2850/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2854/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2849/
Test FAILed (JDK 7 and Scala 2.10).

@@ -174,7 +174,11 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback,
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
offset = record.offset() + 1;
stateRestoreCallback.restore(record.key(), record.value());
if (record.key() == null) {
log.warn("null key found at offset %d while restoring state from topic %s and partition %d. Skipping record", record.offset(), record.topic(), record.partition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could a log level of WARN wreak havoc in case there's a lot of null keys?

I think the log level we pick here should (a) be consistent with what we use elsewhere (not implying it isn't in this PR!) and (b) it should match the guarantees we have documented for global tables. If, for example, we clearly tell the user that null keys are ignored, we could use a less chatty log level than WARN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i'll make it debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i'm just going to silently drop it as that is what we do elsewhere

stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
stateManager.register(store1, false, stateRestoreCallback);
assertEquals(1, stateRestoreCallback.restored.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not assertThat(...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we have to compare KeyValue<byte[], byte[]> and assertThat() won't work., i.e., 2 byte[] with the same contents aren't equal. Need to use assertArrayEquals()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Challenge accepted. ;-)

import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class ArrayComparisonTest {

  @Test
  public void shouldCompareByteArrays() throws Exception {
    byte[] first = "foo".getBytes();
    byte[] second = "foo".getBytes();
    assertThat(first).isEqualTo(second);
  }

}

This test passes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AssertJ knows when it is comparing byte arrays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use that and i'm not adding another dependency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry -- I thought our assertThat was from AssertJ.

But using JUnit's/Hamcrest's assertThat works, too:

import org.junit.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;

public class ArrayComparisonTest {

  @Test
  public void shouldCompareByteArrays() throws Exception {
    byte[] first = "foo".getBytes();
    byte[] second = "foo".getBytes();
    assertThat(first, equalTo(second));
  }

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok -yeah it is actually the KeyValue#equals() that is causing the problem i was having as it calls byte[].equals underneath which doesn't work. Anyway, i can work around it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I didn't want to be picky here, it's just that "someone" (cough) kept telling me in my PRs that we should use assertThat if possible. ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hahaha!

stateManager.register(store1, false, stateRestoreCallback);
assertEquals(1, stateRestoreCallback.restored.size());
final KeyValue<byte[], byte[]> restoredKv = stateRestoreCallback.restored.get(0);
assertArrayEquals(expectedKey, restoredKv.key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not assertThat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

@miguno
Copy link
Contributor

miguno commented Apr 10, 2017

General comment: We should also update the javadocs of GlobalKTable. I'm pretty sure that most users wouldn't expect / know about this.

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2852/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2853/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2857/
Test PASSed (JDK 8 and Scala 2.11).

@enothereska
Copy link
Contributor

@dguy don't we need to do the same check for the normal KTable too?

@dguy
Copy link
Contributor Author

dguy commented Apr 11, 2017

@enothereska For changelogs this will never happen as all key/values that have made it into the changelog will be ok. So restoration is ok. For a KTable that is built via builder.table() then this may happen, now that i think about it. The KTableSource drops null keys, but if it was to be restored it would likely hit the same issue. Thanks!

@miguno
Copy link
Contributor

miguno commented Apr 11, 2017

@dguy (/cc @enothereska): I'd also update the KTable javadocs to cover the null-key behavior/contract.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Just a general comment: should we add a metric-sensor to report if null-key records get dropped?

@@ -54,6 +54,8 @@
*}</pre>
* Note that in contrast to {@link KTable} a {@code GlobalKTable}'s state holds a full copy of the underlying topic,
* thus all keys can be queried locally.
* <p>
* Records from the source topic that have null keys are dropped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. . missing at the end.

@@ -51,6 +51,8 @@
* ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
* view.get(key);
*}</pre>
*<p>
* Records from the source topic that have null keys are dropped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

@dguy
Copy link
Contributor Author

dguy commented Apr 12, 2017

@mjsax that is a good idea, though i i'll file a JIRA for it as i think it applies more broadly than this particular fix. We drop records in various places and don't record it.

https://issues.apache.org/jira/browse/KAFKA-5058

@asfbot
Copy link

asfbot commented Apr 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2906/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Apr 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2902/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Apr 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2901/
Test PASSed (JDK 7 and Scala 2.10).

@dguy
Copy link
Contributor Author

dguy commented Apr 12, 2017

@enothereska i updated it to check KTable. Can you please make another pass? @ijuma would you mind having a look once @enothereska has given the thumbs up?

@enothereska
Copy link
Contributor

LGTM thanks.

@dguy
Copy link
Contributor Author

dguy commented Apr 20, 2017

@guozhangwang can you please merge?

@asfgit asfgit closed this in 3c471d2 Apr 20, 2017
@guozhangwang
Copy link
Contributor

LGTM and Merged to trunk.

@dguy I tried to cherry-pick to 0.10.2 but the conflicts are quite complex for me to resolve-forward, could you file a separate PR for 0.10.2 also?

@dguy dguy deleted the kafka-5047 branch May 16, 2017 14:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants