KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (1/N)#21572
KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (1/N)#21572frankvicky merged 28 commits intoapache:trunkfrom
Conversation
| @Test | ||
| public void shouldNotBuildHeadersAwareStoreWithCachingEvenIfExplicitlySet() { | ||
| doReturn("headers") | ||
| .when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG); | ||
|
|
||
| final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = | ||
| new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("store").withCachingEnabled(), nameProvider, STORE_PREFIX); | ||
|
|
||
| final TimestampedKeyValueStoreWithHeaders<String, String> store = getHeadersAwareStore(materialized); | ||
|
|
||
| final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); | ||
| assertThat(store, instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class)); | ||
| assertThat(logging, instanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class)); | ||
| // Verify that caching layer was NOT added | ||
| assertThat(logging, not(instanceOf(CachingKeyValueStore.class))); | ||
| } |
There was a problem hiding this comment.
It seems that the test will fail because that we only check cache condition for versioned store ?
https://github.com/aliehsaeedii/kafka/blob/560f78cd966899d21ad94501373bc421204898eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java#L84-L90
There was a problem hiding this comment.
Thanks @frankvicky. Fixed the test. The code is correct but the test does not make sense.
mjsax
left a comment
There was a problem hiding this comment.
Made a pass (skipped tests for now)
| config.originals() | ||
| ); | ||
| } | ||
| if (dslStoreFormat == null) { |
There was a problem hiding this comment.
We did not update the constructor to pass in DslStoreFormat, so it would always be null here atm. Is this something that will change with follow up PRs? Or would we never need to pass via constructor, and this check is not necessary as we would always set it from the config?
There was a problem hiding this comment.
yes we must always set it from the confg. So i remove the if clause.
| } | ||
| if (dslStoreFormat == null) { | ||
| final String dslStoreFormatValue = config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG); | ||
| if (dslStoreFormatValue.equalsIgnoreCase("HEADERS")) { |
There was a problem hiding this comment.
| if (dslStoreFormatValue.equalsIgnoreCase("HEADERS")) { | |
| if (dslStoreFormatValue.equalsIgnoreCase(StreamsConfig.DSL_STORE_FORMAT_HEADERS)) { |
| public static final String DSL_STORE_FORMAT_DEFAULT = "DEFAULT"; | ||
| private static final String DSL_STORE_FORMAT_DOC = "Controls the state store type used by the DSL store supplier (see " + | ||
| "config 'dsl.store.suppliers.class'). " + | ||
| "'default' uses timestamped stores. " + |
There was a problem hiding this comment.
This is not totally correct, right? Or do we want to just ignore this fact? Might be ok as-is; just double checking.
There was a problem hiding this comment.
I changed it to
private static final String DSL_STORE_FORMAT_DOC = "Specifies the state store format for DSL operators. " +
"'DEFAULT' creates either timestamped or plain state stores, depending on context. " +
"'HEADERS' creates headers-aware stores that preserve record headers.";
This is more accurate.
| public static final String DSL_STORE_FORMAT_CONFIG = "dsl.store.format"; | ||
| public static final String DSL_STORE_FORMAT_DEFAULT = "DEFAULT"; | ||
| private static final String DSL_STORE_FORMAT_DOC = "Controls the state store type used by the DSL store supplier (see " + | ||
| "config 'dsl.store.suppliers.class'). " + |
There was a problem hiding this comment.
| "config 'dsl.store.suppliers.class'). " + | |
| "config '" + DSL_STORE_SUPPLIER_CONFIG + "'). " + |
| ? Stores.persistentTimestampedKeyValueStore(params.name()) | ||
| : Stores.persistentKeyValueStore(params.name()); | ||
| final DslStoreFormat storeFormat = params.dslStoreFormat(); | ||
| if (storeFormat.equals(DslStoreFormat.HEADERS)) { |
There was a problem hiding this comment.
| if (storeFormat.equals(DslStoreFormat.HEADERS)) { | |
| if (storeFormat == DslStoreFormat.HEADERS) { |
There was a problem hiding this comment.
Or maybe even use switch (storeFormat) ?
| if (dslStoreFormatValue.equalsIgnoreCase("HEADERS")) { | ||
| dslStoreFormat = DslStoreFormat.HEADERS; | ||
| } else { // DEFAULT | ||
| dslStoreFormat = DslStoreFormat.TIMESTAMPED; |
There was a problem hiding this comment.
This logic is too generic -- it works for most DSL operators but some need to translate "DEFAULT" to PLAIN.
I don't think we can make this translation here, but need to push it down a lower layer where it's know how to translate it?
| materialized.keySerde(), | ||
| materialized.valueSerde()); | ||
| } else { | ||
| } else if (storeFormat.equals(DslStoreFormat.HEADERS)) { |
There was a problem hiding this comment.
| } else if (storeFormat.equals(DslStoreFormat.HEADERS)) { | |
| } else if (storeFormat == DslStoreFormat.HEADERS) { |
There was a problem hiding this comment.
Similar elsewhere. Enums are not object; we can use ==
| supplier, | ||
| materialized.keySerde(), | ||
| materialized.valueSerde()); | ||
| } else if (storeFormat.equals(DslStoreFormat.PLAIN)) { |
There was a problem hiding this comment.
Given the current code-setup, dslStoreFormat() would never return PLAIN, right? So this case is currently not possible?
| final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde()); | ||
|
|
||
| final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false); | ||
| final DslStoreFormat storeFormat = dslStoreFormat() == DslStoreFormat.HEADERS ? dslStoreFormat() : DslStoreFormat.PLAIN; |
There was a problem hiding this comment.
Looks not clean to me, to override TIMESTAMPED as PLAIN here?
| * @param dslStoreFormat the format of the state store, see ({@link DslStoreFormat} | ||
| */ | ||
| public DslKeyValueParams(final String name, final DslStoreFormat dslStoreFormat) { | ||
| Objects.requireNonNull(name); |
| * @param name the name of the store (cannot be {@code null}) | ||
| * @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore} | ||
| */ | ||
| @Deprecated |
we keep the constructor but we deprecate it as it is listed in the KIP (kip-1285). |
| if (dslStoreFormatValue.equalsIgnoreCase(StreamsConfig.DSL_STORE_FORMAT_HEADERS)) { | ||
| dslStoreFormat = DslStoreFormat.HEADERS; | ||
| } | ||
| // else dslStoreFormat remains null and the lower layers decide between PLAIN and TIMESTAMPED |
There was a problem hiding this comment.
I actually proposed, that we would pass dslStoreFormatValue down to lower layers, and let them translated the String config from dsl.store.format to the DslStoreFormat enum...
But I guess this also work. Feel a little bit less clean to me personally, but maybe not worth to change now that this PR is already merged.
This PR introduces the DslStoreFormat enum and extends DslKeyValueParams
to enable headers-aware key-value stores in the Kafka Streams DSL,
implementing the foundational infrastructure for
KIP-1285.
Reviewers: Matthias J. Sax matthias@confluent.io, TengYao Chi
frankvicky@apache.org