New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8515: (POC) add state store type in KTable for correct materialization #6885
Conversation
@@ -114,7 +114,7 @@ | |||
queryableStoreName, | |||
aggregateSupplier, | |||
statefulProcessorNode, | |||
builder); | |||
builder, ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you missed passing in the store type? Similar in other classes.
@@ -176,9 +170,10 @@ public String queryableStoreName() { | |||
|
|||
@Override | |||
public KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, | |||
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) { | |||
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) throws UnsupportedDataTypeException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does still enforce passing in a KeyValueStore
-- hence, if the KTable is windowed, validateStateStoreMatchingDataType()
would fail but there is no alternative to pass in a WindowStore
.
It would be a "fix" to avoid unbounded growth of the result table, but still does not allow to materialize a windowed-KTable filter result.
@@ -1002,7 +1003,7 @@ public void kTableNamedMaterializedMapValuesShouldPreserveTopologyStructure() { | |||
final KTable<Object, Object> table = builder.table("input-topic"); | |||
table.mapValues( | |||
(readOnlyKey, value) -> null, | |||
Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as("store-name").withKeySerde(null).withValueSerde(null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the place where we need to remove type assertion for example
ed66614
to
68217af
Compare
@@ -244,6 +246,45 @@ public void shouldMaterializeAggregated() { | |||
} | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a sample usage to materialize the windowed KTable. The syntax is still not ideal here, but could work.
.withValueSerde(Serdes.Integer())); | ||
final KTableImpl<String, String, Integer> table3 = | ||
(KTableImpl<String, String, Integer>) table1.mapValues( | ||
value -> new Integer(value) * (-1), | ||
Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3) | ||
Materialized.<String, Integer, StateStore>as(storeName3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of refactoring, user needs to use weak type here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the trade-offs we take during this POC: ProcessorSupplier
type weakened from <K, V>
to <V>
because we need the capability to return both K
and Windowed<K>
for KTable materialization.
} | ||
|
||
@Override | ||
public Processor<K, Change<V>> get() { | ||
return new KTableFilterProcessor(); | ||
public Processor<?, Change<V>> get() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use KTableFilter
as an example. We need to return different processor upon the state store type for all the supplier get function calls. This is because the assumption of getting a key-value store is no longer valid.
@@ -1138,12 +1137,19 @@ | |||
*/ | |||
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, | |||
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, | |||
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); | |||
final Materialized<K, VR, StateStore> materialized); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole KTable API is generified by reducing KeyValueStore to StateStore. Instead of compile time type checking, we pushed the type check internally
|
||
private final String queryableStoreName; | ||
|
||
private final StateStoreType stateStoreType; | ||
|
||
// Needed if given KTable needs to be materialized as window store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put timeWindow/sessionWindow as optional fields with KTable API. This gives us access to utilize those information to materialize the table.
@@ -22,18 +22,22 @@ | |||
public class WindowedSerdes { | |||
|
|||
static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> { | |||
final Serde<T> inner; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make windowed serde capable of returning raw serde type
@@ -50,9 +51,18 @@ | |||
@SuppressWarnings("unchecked") | |||
@Override | |||
void initStoreSerde(final ProcessorContext context) { | |||
final Serde<K> rawKeySerde; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get raw keySerde back when we are materializing window store.
@@ -318,15 +318,6 @@ public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() { | |||
.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME)); | |||
} | |||
|
|||
@Test(expected = NullPointerException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail at compile time because of vague type.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)