Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5651: [FOLLOW-UP] add with method to Materialized #4009

Closed
wants to merge 6 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Oct 3, 2017

Add a with(Serde keySerde, Serde valSerde) to Materialized for cases where people don't care about the state store name.

@dguy
Copy link
Contributor Author

dguy commented Oct 3, 2017

@mjsax, @guozhangwang, @bbejeck
If we are ok with this then i'll create a JIRA and add an addendum to the KIP

@mjsax
Copy link
Member

mjsax commented Oct 3, 2017

@dguy As it's part of a KIP, I think a Jira is appropriate.

@dguy
Copy link
Contributor Author

dguy commented Oct 3, 2017

@mjsax, yes i was going to create one if we think this is what we want...

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a meta comment as another way to address this.

@@ -24,15 +24,25 @@

public class KeyValueStoreMaterializer<K, V> {
private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized;
private final InternalNameProvider streamsBuilder;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: variable name to internalNameProvider?

package org.apache.kafka.streams.kstream.internals;

public interface InternalNameProvider {
String newName(String prefix);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newProcessorName?

@@ -139,7 +140,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil
Objects.requireNonNull(materialized, "materialized can't be null");
// explicitly disable logging for global stores
materialized.withLoggingDisabled();
final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
final StoreBuilder storeBuilder = new KeyValueStoreMaterializer<>(materialized, this).materialize(KTableImpl.SOURCE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... shouldn't we add KTableImpl.STATE_STORE_NAME also?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: actually, how about the following: add a protected storeName setter in Materialized, and then we can just add the additional logic in this class by checking the MaterializedInternal object has either storeName or StoreSupplier, and if none is provided call the setter with the store name from newStoreName(SOURCE_NAME).

Then we would not need any other classes' modifications.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure i follow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to pass the InternalNameProvider and prefix into MaterializedInternal at construction as this will be required in other places. That way at least the logic will all be within the MaterializedInternal ctor

@ijuma
Copy link
Contributor

ijuma commented Oct 4, 2017

Btw, you can reuse the same JIRA for the original KIP as well (i.e. a new JIRA is not necessarily required).

@dguy dguy changed the title MINOR: add with method to Materialized KAFKA-5651: [FOLLOW-UP] add with method to Materialized Oct 4, 2017
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One nit.

Also, can you update the JavaDoc in KGroupedTable#aggregate(Initializer, Aggregator, Aggregator, Serde); and KGroupedStream#aggregate(Initializer, Aggregator, Serde)

super(materialized);
this.queryable = queryable;
if (storeName() == null) {
this.queryable = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove this (same below)

@@ -1509,7 +1509,7 @@
* @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
* the latest (rolling) aggregate for each key within a window
* @deprecated use {@link #windowedBy(SessionWindows) windowedBy(sessionWindows)} followed by
* {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
* {@link SessionWindowedKStream#aggregate(Initializer, Aggregator, Merger, Materialized) aggregate(initializer, aggregator, sessionMerger, Materialized.with(keySerde, aggValueSerde))}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be Materialized.with(null, aggValueSerde) -- there is no keySerde parameter in the deprecated method. The idea is to show a 1:1 translation to the new API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I forgot, there are three aggregate methods in this class that need this update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, in that case we also need to set the keySerde on the Materialized to the one provided in KGroupedStream etc. Otherwise it is going to revert to the defaults, which may not be correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Using null implies fall back to default -- that is the same behavior as the deprecated method has (the deprecate method does not take a keySerde argument and thus used the default one)

@@ -855,7 +855,7 @@
* @param <VR> the value type of the aggregated {@link KTable}
* @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
* latest (rolling) aggregate for each key
* @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.as("someStoreName").withValueSerde(aggValueSerde))}
* @deprecated use {@link #aggregate(Initializer, Aggregator, Aggregator, Materialized) aggregate(initializer, adder, subtractor, Materialized.with(keySerdes, aggValueSerde))}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above (it's only one method in this class)

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dguy
Copy link
Contributor Author

dguy commented Oct 5, 2017

retest this please

@guozhangwang
Copy link
Contributor

Hmm.. this one looks new to me:

java.lang.AssertionError: Key not found forest
Stacktrace

java.lang.AssertionError: Key not found forest
	at org.junit.Assert.fail(Assert.java:88)
	at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.verifyGreaterOrEqual(QueryableStateIntegrationTest.java:898)
	at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.concurrentAccesses(QueryableStateIntegrationTest.java:400)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)

does that looks familiar to you guys? If we believe it is not related is there a JIRA for it?

@dguy
Copy link
Contributor Author

dguy commented Oct 5, 2017

We've seen that fail periodically over the past couple of months

@dguy
Copy link
Contributor Author

dguy commented Oct 5, 2017

retest this please

@mjsax
Copy link
Member

mjsax commented Oct 5, 2017

There is an old JIRA with incomplete description... https://issues.apache.org/jira/browse/KAFKA-4263 -- pretty sure, it's unrelated to this PR.

@dguy
Copy link
Contributor Author

dguy commented Oct 6, 2017

retest this please

2 similar comments
@guozhangwang
Copy link
Contributor

retest this please

@guozhangwang
Copy link
Contributor

retest this please

@guozhangwang
Copy link
Contributor

LGTM. Merged to trunk and 1.0.

@asfgit asfgit closed this in 23a0140 Oct 6, 2017
asfgit pushed a commit that referenced this pull request Oct 6, 2017
Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where people don't care about the state store name.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Matthias J. Sax <matthias@confluent.io>

Closes #4009 from dguy/materialized

(cherry picked from commit 23a0140)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants