-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6138 Simplify StreamsBuilder#addGlobalStore #4430
Conversation
deprecated StreamsBuilder#addGlobalStore and InternalStreamsBuilder#addGlobalStore add new StreamsBuilder#addGlobalStore and InternalStreamsBuilder#addGlobalStore without sourceName and processorName as parameter generate sourceName and processorName by using InternalStreamsBuilder#newProcessorName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Some initial comments.
String globalTopicName = "testGlobalTopic"; | ||
String globalStoreName = "testAddGlobalStore"; | ||
final StreamsBuilder builder = new StreamsBuilder(); | ||
KeyValueStoreBuilder t = new KeyValueStoreBuilder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line fails in the build and needs a fix.
@@ -476,6 +476,7 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder builder) { | |||
* @throws TopologyException if the processor of state is already registered | |||
*/ | |||
@SuppressWarnings("unchecked") | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should extend the JavaDoc, with @deprecated
annotation to, and point to the new method that should be used.
@@ -492,6 +493,39 @@ public synchronized StreamsBuilder addGlobalStore(final StoreBuilder storeBuilde | |||
stateUpdateSupplier); | |||
return this; | |||
} | |||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: missing empty line
@@ -174,7 +174,7 @@ public String newStoreName(final String prefix) { | |||
public synchronized void addStateStore(final StoreBuilder builder) { | |||
internalTopologyBuilder.addStateStore(builder); | |||
} | |||
|
|||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an internal class -- no need to mark as deprecated.
final String topic, | ||
final ConsumedInternal consumed, | ||
final ProcessorSupplier stateUpdateSupplier) { | ||
// explicitly disable logging for global stores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should not copy the code from the old addGlobalStore()
but rather call the old addGlobalStore()
passing the generated names.
extend the JavaDoc with @deprecated pointing to new method add missing empty line call old addGlobalStore instead of copy code
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just two nits. Otherwise LGTM.
Call for second review @bbejeck @dguy @guozhangwang
@@ -194,4 +194,19 @@ public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeB | |||
processorName, | |||
stateUpdateSupplier); | |||
} | |||
public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: add empty line
@@ -204,4 +204,6 @@ public static InternalTopologyBuilder internalTopologyBuilder(final StreamsBuild | |||
public static Collection<Set<String>> getCopartitionedGroups(final StreamsBuilder builder) { | |||
return builder.internalTopologyBuilder.copartitionGroups(); | |||
} | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revert
@dguy @bbejeck @guozhangwang Can we get a second review? If not merged today, it won't be included in 1.1. @panuwat-sc You will need to address all comments today so we can get it into 1.1 release. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for PR, LGTM
Retest this please. |
retest this please |
add a missing empty line in InternalStreamsBuilder.java fix revert from last commit(remove two empty line) in StreamsBuilderTest.java
Merged to Thanks for the KIP and patch @panuwat-sc ! |
deprecated StreamsBuilder#addGlobalStore and InternalStreamsBuilder#addGlobalStore
add new StreamsBuilder#addGlobalStore and InternalStreamsBuilder#addGlobalStore without sourceName and processorName as parameter
generate sourceName and processorName by using InternalStreamsBuilder#newProcessorName
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)