New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15774: introduce internal StoreFactory #14659
Conversation
@@ -520,7 +521,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, | |||
*/ | |||
public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder) { | |||
Objects.requireNonNull(builder, "builder can't be null"); | |||
internalStreamsBuilder.addStateStore(builder); | |||
internalStreamsBuilder.addStateStore(new StoreBuilderWrapper<>(builder)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right off the bat I don't love the name StoreBuilderWrapper
...but I think we both know this was going to be a practice in painful out of control java naming lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally tried DelegatingStoreFactory
but settled on this. Open to suggestions :)
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
nodeGroups = null; | ||
} | ||
|
||
public final <KIn, VIn> void addGlobalStore(final StoreBuilder<?> storeBuilder, | ||
public final <KIn, VIn> void addGlobalStore(final StoreFactory<?> storeBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: change the parameter name to storeFactory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, there were lots of places I need to make this change but I didn't want to touch 2 million lines of code until the general strategy was looked at 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very understandable -- just wanted to leave a comment in case you missed it/to help identify some of those two million lines 😭
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public boolean isCompatibleWith(final StoreFactory<?> storeFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you didn't add this code (even if the method itself is new) but some javadocs or a more descriptive name might be useful here. Hard to understand whether this is/will be working correctly after the changes when it's unclear what its original semantics/purpose were
I guess this is related to a fundamental source of confusion I have right now, which is: where does the StoreBuilderWrapper fit into the final plan? Will anything extend it, or will uninitialized/unconfigured StoreFactory implementations somehow be compiled into this class when they finally set up a real StoreBuilder? Like is this class a "storebuilder-wrapper" in the sense that it always wraps an actual configured/initialized StoreBuilder object, or will the builder
field of this class be "delayed" (in which case how do we evaluate if it's compatible, hence how it all relates to this specific question/method)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a comment. The plan is to add a separate DelayedStoreFactory
which delays the creation of the StoreBuilder
until configure
is called. the isCompatibleWith
method will check to make sure the inner components would generate the same store builder (e.g. the wrapped StoreSuppliers
are identical)
I'll just make that change and add it to this PR to show how it works end-to-end, but that will be a functionality change.
EDIT: actually I can add it w/o changing functionality if I just ignore configure()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just the one small comment for now...looks good on the whole. I assume a similar approach will be taken for window/session stores, though we'll have to implement the Materializer class for each of these since they don't already exist
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
Outdated
Show resolved
Hide resolved
* <li>{@link org.apache.kafka.streams.state.StoreSupplier} is used by the | ||
* DSL to provide preconfigured state stores as well as type-safe stores | ||
* (e.g. {@link org.apache.kafka.streams.state.KeyValueBytesStoreSupplier}. | ||
* Before passing the {@code StoreSupplier} into the </li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incomplete sentence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙄 🤭 😬
import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder; | ||
import org.apache.kafka.streams.state.internals.WindowStoreBuilder; | ||
|
||
public class StoreBuilderWrapper implements StoreFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok one more thing: the javadocs explaining the StoreFactory are great, but I think this needs javaodcs as well. Is it just the default implementation of the StoreFactory class? Is it just here to cover the window/session store cases because they don't yet have a Materializer? etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's here mostly to cover use cases where the user passes in StoreBuilder
directly (via PAPI or DSL#processes funs) - so it's here to stay :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha -- thanks for clarifying. So IIUC it is like a "wrapper" for already-instantiated StoreBuilders? The current name actually makes total sense once I understood what the class does. Of course ideally the reverse is true...that you can understand what the class does based on the name. But maybe it's obvious and I just lack reading comprehension skills 🤷♀️
I can't think of a better name, but I feel better now that there's a description for this. Thanks!
return StoreType.IN_MEMORY; | ||
case StreamsConfig.ROCKS_DB: | ||
default: | ||
// for backwards compatibility, we ignore invalid store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I would just make this change now, and throw an exception if it's invalid. If you really want to keep the behavior the same for whatever reason, that's fine too, but we should at least log a warning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
@ableegoldman none of the failing tests seem remotely related and a quick spotcheck shows them passing locally: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -16,30 +16,9 @@ | |||
*/ | |||
package org.apache.kafka.streams.processor.internals; | |||
|
|||
import org.apache.kafka.clients.consumer.OffsetResetStrategy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid auto-reformatting.
This is a follow up from #14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in apache#14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...). It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes) Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This is a follow up from apache#14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in apache#14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...). It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes) Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in apache#14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...). It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes) Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org> Co-authored-by: Almog Gavra <almog.gavra@gmail.com>
This is a follow up from apache#14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
…) (#233) This is a follow up from apache#14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org> Co-authored-by: Almog Gavra <almog.gavra@gmail.com>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in apache#14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...). It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes) Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This is a follow up from apache#14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in apache#14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...). It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes) Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This is a follow up from apache#14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
This is a follow up from apache#14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
Overview
This PR sets up the necessary prerequisites to respect configurations such as
dsl.default.store.type
and thedsl.store.suppliers.class
(introduced in #14648) without requiring them to be passed in toStreamBuilder#new(TopologyConfig)
(passing them only intonew KafkaStreams(...)
.It essentially makes
StoreBuilder
an external-only API and internally it uses theStoreFactory
equivalent. It replacesKeyValueStoreMaterializer
with an implementation ofStoreFactory
that creates the store builder only afterconfigure()
is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the*WindowKStreamImpl
classes)Testing
There is no change in functionality for this PR
Review Guide
StoreFactory
and read the JavaDocs, this is an interface representing what used to beInternalTopologyBuilder.StateStoreFactory
StoreBuilderWrapper
is what used to be the implementation ofInternalTopologyBuilder.StateStoreFactory
InternalTopologyBuilder#addStateStore
now takes in aStoreFactory
instead of aStoreBuilder
from everywhere that uses the DSL.StoreBuilder
withStoreBuilderWrapper
. In a future PR allStoreBuilderWrappers
in the DSL will be replaced with a new one that respects the configurations passed in tonew KafkaStreams
Committer Checklist (excluded from commit message)