-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Improve checks for CogroupedStreamAggregateBuilder #9141
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Just some minor cleaning up left over after you removed a bunch of stuff. (I just opened it in IDEA and looked for anything that was greyed out). Give it a pass to make sure the method parameters are all aligned properly after you remove the generics -- a tip from experience, I always forget to fix that after making changes.
Also, github won't let me comment on this line because it's too far from the PR, but can you fix the alignment of parameters down in #createRepartitionSource as well?
@@ -47,18 +47,96 @@ | |||
CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) { | |||
this.builder = builder; | |||
} | |||
|
|||
<KR, VIn, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, | |||
<KR, VIn, W extends Window> KTable<KR, VOut> buildNotWindowed(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about naming this just build
? It's not as clear, but I think it's in line with the naming conventions elsewhere. For example we have KStreamWindowAggregate
and KStreamAggregate
(then maybe we can come up with a more descriptive name for the method currently called build
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you can remove the Vin
and W extends Window
generics on this method
} | ||
|
||
private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Aggregator<? super K, Object, VOut> aggregator, | ||
final Initializer<VOut> initializer, | ||
private <W extends Window> StatefulProcessorNode<K, ?> getStatefulProcessorNode(final Initializer<VOut> initializer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the <W extends Window>
now, right? Also it looks like the initializer
input isn't needed anymore either
} | ||
|
||
private void build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, | ||
final StoreBuilder<?> storeBuilder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call this something like ensureCopartitioning
or processRepartitions
or something? My take is that the copartitioning is the main point of this method so that's probably good to include in the name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with processRepartitions and then updated all the other methods to just be build
with different parameters since that seems cleaner
} | ||
} | ||
|
||
<KR, VIn, W extends Window> KTable<KR, VOut> createTable(final Collection<StreamsGraphNode> processors, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove the W extends Window
generic
return createTable(processors, named, keySerde, valueSerde, queryableName); | ||
} | ||
|
||
<KR, VIn, W extends Window> KTable<KR, VOut> buildTimeWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need Vin
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly I think it's fine to just name all three of these build
, since they accept different parameters and it should be pretty clear from the context whether it's windowed or not. But being more descriptive is never a bad thing either. Your call 🙂
return createTable(processors, named, keySerde, valueSerde, queryableName); | ||
} | ||
|
||
<KR, VIn, W extends Window> KTable<KR, VOut> buildSessionWindows(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need Vin
and W extends Window
here
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) { | ||
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode( | ||
initializer, | ||
named.suffixWithOrElseGet( | ||
"-cogroup-agg-" + counter++, | ||
builder, | ||
CogroupedKStreamImpl.AGGREGATE_NAME), | ||
stateCreated, | ||
storeBuilder, | ||
new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue())); | ||
stateCreated = true; | ||
processors.add(statefulProcessorNode); | ||
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); | ||
} | ||
return createTable(processors, named, keySerde, valueSerde, queryableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is all the same for all three methods except for the KStreamAggregate
/KStreamWindowAggregate
/etc right? I think if you wanted to further deduplicate things you could factor this out into a method that accepts a Function< KGroupedStreamImpl, KStreamAggProcessorSupplier
>, and then each of the build
methods can just pass in a function that returns new KStreamWindowAggregate
or so on.
I'm not sure it's really worth it or not, but it can be done in case you were wondering. Up to you whether you want to do it
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @lct45, thanks for this improvement!
It LGTM, but can you fix the indentation before I merge this?
"-cogroup-merge", | ||
builder, | ||
CogroupedKStreamImpl.MERGE_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like your indentation is set to 8 spaces instead of 4.
All the tests were green except for the flaky
|
Verified the last commit only changed whitespace before merging. |
Updated
CogroupedStreamAggregateBuilder
to have individual builders depending on the windowed aggregation, or lack thereof. This replaced passing in all options into the builder, with all but the current type of aggregation set to null and then checking to see which value was not null.Committer Checklist (excluded from commit message)