Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15774: introduce internal StoreFactory #14659

Merged
merged 6 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
Expand Down Expand Up @@ -520,7 +521,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
*/
public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder) {
Objects.requireNonNull(builder, "builder can't be null");
internalStreamsBuilder.addStateStore(builder);
internalStreamsBuilder.addStateStore(new StoreBuilderWrapper(builder));
return this;
}

Expand Down Expand Up @@ -563,7 +564,7 @@ public synchronized <K, V> StreamsBuilder addGlobalStore(final StoreBuilder<?> s
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(
storeBuilder,
new StoreBuilderWrapper(storeBuilder),
topic,
new ConsumedInternal<>(consumed),
() -> ProcessorAdapter.adapt(stateUpdateSupplier.get())
Expand Down Expand Up @@ -607,7 +608,7 @@ public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(
storeBuilder,
new StoreBuilderWrapper(storeBuilder),
topic,
new ConsumedInternal<>(consumed),
stateUpdateSupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ public class StreamsConfig extends AbstractConfig {

public static final String ROCKS_DB = "rocksDB";
public static final String IN_MEMORY = "in_memory";
public static final String DEFAULT_DSL_STORE_DEFAULT = ROCKS_DB;

/** {@code default.windowed.key.serde.inner} */
@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(DEFAULT_DSL_STORE_CONFIG,
Type.STRING,
ROCKS_DB,
DEFAULT_DSL_STORE_DEFAULT,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_DOC)
Expand Down
9 changes: 5 additions & 4 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;
Expand Down Expand Up @@ -775,7 +776,7 @@ public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBu
final String processorName,
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
storeBuilder,
new StoreBuilderWrapper(storeBuilder),
sourceName,
null,
keyDeserializer,
Expand Down Expand Up @@ -827,7 +828,7 @@ public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBu
final String processorName,
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
storeBuilder,
new StoreBuilderWrapper(storeBuilder),
sourceName,
timestampExtractor,
keyDeserializer,
Expand Down Expand Up @@ -870,7 +871,7 @@ public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> sto
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
storeBuilder,
new StoreBuilderWrapper(storeBuilder),
sourceName,
null,
keyDeserializer,
Expand Down Expand Up @@ -915,7 +916,7 @@ public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> sto
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
storeBuilder,
new StoreBuilderWrapper(storeBuilder),
sourceName,
timestampExtractor,
keyDeserializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
}

public Materialized.StoreType parseStoreType() {
if (storeType.equals(IN_MEMORY)) {
return Materialized.StoreType.IN_MEMORY;
}
return Materialized.StoreType.ROCKS_DB;
return Materialized.StoreType.parse(storeType);
}

public boolean isNamedTopology() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -69,7 +70,21 @@ public class Materialized<K, V, S extends StateStore> {
// the built-in state store types
public enum StoreType {
ROCKS_DB,
IN_MEMORY
IN_MEMORY;

public static StoreType parse(final String storeType) {
switch (storeType) {
case StreamsConfig.IN_MEMORY:
return StoreType.IN_MEMORY;
case StreamsConfig.ROCKS_DB:
default:
// for backwards compatibility, we ignore invalid store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I would just make this change now, and throw an exception if it's invalid. If you really want to keep the behavior the same for whatever reason, that's fine too, but we should at least log a warning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

// types as this is how the code was originally, consider
// cleaning this up and throwing an exception for invalid
// store types
return StoreType.ROCKS_DB;
}
}
}

private Materialized(final StoreSupplier<S> storeSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private KTable<K, VOut> doAggregate(final Initializer<VOut> initializer,
groupPatterns,
initializer,
named,
new KeyValueStoreMaterializer<>(materializedInternal).materialize(),
new KeyValueStoreMaterializer<>(materializedInternal),
materializedInternal.keySerde(),
materializedInternal.valueSerde(),
materializedInternal.queryableStoreName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;

class CogroupedStreamAggregateBuilder<K, VOut> {
private final InternalStreamsBuilder builder;
Expand All @@ -52,46 +52,46 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final StoreFactory storeFactory,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final boolean isOutputVersioned) {
processRepartitions(groupPatterns, storeBuilder);
processRepartitions(groupPatterns, storeFactory);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue());
new KStreamAggregate<>(storeFactory.name(), initializer, kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
storeFactory,
parentProcessor);
statefulProcessorNode.setOutputVersioned(isOutputVersioned);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
}

@SuppressWarnings("unchecked")
<KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final StoreFactory storeFactory,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
processRepartitions(groupPatterns, storeBuilder);
processRepartitions(groupPatterns, storeFactory);

final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
Expand All @@ -101,7 +101,7 @@ <KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamWindowAggregate<K, K, VOut, W>(
windows,
storeBuilder.name(),
storeFactory.name(),
EmitStrategy.onWindowUpdate(),
initializer,
kGroupedStream.getValue());
Expand All @@ -112,26 +112,26 @@ <KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
}

@SuppressWarnings("unchecked")
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final StoreFactory storeFactory,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
processRepartitions(groupPatterns, storeBuilder);
processRepartitions(groupPatterns, storeFactory);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
Expand All @@ -140,7 +140,7 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSessionWindowAggregate<K, K, VOut>(
sessionWindows,
storeBuilder.name(),
storeFactory.name(),
EmitStrategy.onWindowUpdate(),
initializer,
kGroupedStream.getValue(),
Expand All @@ -152,25 +152,25 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
}

@SuppressWarnings("unchecked")
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final StoreFactory storeFactory,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
processRepartitions(groupPatterns, storeBuilder);
processRepartitions(groupPatterns, storeFactory);
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
Expand All @@ -179,7 +179,7 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, ?, K, ?>) new KStreamSlidingWindowAggregate<K, K, VOut>(
slidingWindows,
storeBuilder.name(),
storeFactory.name(),
// TODO: We do not have other emit policies for co-group yet
EmitStrategy.onWindowUpdate(),
initializer,
Expand All @@ -191,25 +191,25 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeBuilder,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.name());
}

private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final StoreBuilder<?> storeBuilder) {
final StoreFactory storeFactory) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {

if (repartitionReqs.repartitionRequired) {

final OptimizableRepartitionNodeBuilder<K, ?> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();

final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeBuilder.name();
repartitionReqs.userProvidedRepartitionTopicName : storeFactory.name();

createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);

Expand Down Expand Up @@ -263,22 +263,22 @@ <KR, VIn> KTable<KR, VOut> createTable(final Collection<GraphNode> processors,

private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName,
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final StoreFactory storeFactory,
final ProcessorSupplier<K, ?, K, ?> kStreamAggregate) {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
storeBuilder
storeFactory
);
} else {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
new String[]{storeBuilder.name()}
new String[]{storeFactory.name()}
);
}

Expand Down