-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-6840: Add windowed-KTable API #5044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Hey @mjsax @guozhangwang, before we start the discussion thread, maybe you could give me some initial feedback on the code or KIP? Especially the logic has been a little deviated from our initial discussion on Jira, and I reflected those in the KIP. Let me know if you need more clarification, thank you! |
e6d61cb to
e6bad8a
Compare
|
Thanks @abbccdda! AK release deadline is coming up and we need to focus on getting pending KIPs and PR ready for the release. We will cycle back when we find time. Thanks for your understanding. (Feel free to ping us after the 2.0 release is out in case this would slip.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Apache license header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong class name
|
Thanks for the clarification @mjsax so we are expecting 2.0 fully release in June? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be better to name it windowedKTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be consistent with createKTable()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to rename to windowedKTable
nit: fit formatting (either move consumed down one line, or indent other parameter to match indention of consumed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can define a local variable for key.window().start()
|
test this please |
|
Without adding license header, no tests would be run. |
|
@mjsax @guozhangwang Thanks a lot for helping in review the PR. We totally understand the priority of the Kafka 2.0 release and I am wondering if we can do a review on the API and have an agreement? We can leave the detailed review and polish later when you have more time. Really appreciate your help! |
|
@abbccdda Release plan can be found here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820 After code freeze we should have time. Maybe even before. Just can't promise, but I'll try to squeeze it in if possible. :) |
|
@tedyu thanks for the comments. Addressing |
dbd7185 to
3ec5183
Compare
|
@Ishiihara seems passed. |
|
@Ishiihara will resolve conflict tonight |
Ishiihara
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general. Left some minor comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to support session window as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we would support it for now, maybe in next generation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might be better to set end to another timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Serde/serde
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some comments for the return type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question, why this class needs to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use it for unit testing purpose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to test both session window and time window?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For internal release, this could be postponed. Will add session window test later
e5a2524 to
4068d43
Compare
|
@mjsax @guozhangwang we are planning an internal release next week, could you take a quick look on the API to provide us some feedback? Thanks! |
49b7198 to
2c6911a
Compare
|
@abbccdda One high level comment: it seems that we have quite a few code duplications for the time window and session window. It would be nice if we can reduce the duplication. |
guozhangwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a meta comment: we do not have unit tests for SessionWindowedKTableSource and SessionWindowedStoreMaterializer, is it intentional?
|
The other PR #5307 is close to be merged I think. We should proceed on voting and accepting the KIP in order to progress. Then we can come back to this PR after 5307 is merged. |
09df702 to
d02bca0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey guys @guozhangwang @Ishiihara @shawnsnguyen This is a draft version for latest KIP-300. To make sure the review is efficient, be advised to focus on looking at changes in StreamsBuilder, InternalStreamsBuilder, KTableSource and TableSourceNode classes. As you could see this is a very large change, so I want to make sure major changes are getting common understanding before proceeding to further unit test changes.
Major changes include:
-
generalize the #table() API in InternalStreamsBuilder where our return type is <?, V> instead of strict <K, V>. The reasoning is that we want to avoid too much code redundancy and the public API could cast the return type as needed. We couldn't make the type more explicit because underlying window store, session store and key value store are not sharing single common interface.
-
Make KTableSource as abstract class and let different table source (kv, window, session) inherit. This could also save code redundancy.
-
Public API changes to support 8:
#windowTable(topic, conusmed, materialized)
#windowTable(topic)
#windowTable(topic, conusmed)
#windowTable(topic, materialized)
#sessionTable(topic, conusmed, materialized)
#sessionTable(topic)
#sessionTable(topic, conusmed)
#sessionTable(topic, materialized) -
Overload
StreamSourceNodeto pass in two new parameters:
private final boolean isWindowed;
private final ConsumedInternal<Windowed<K>, V> windowedConsumedInteral;
which will replace the returned consumedInternal() to windowed consume. This could reduce the scattering of windowed serde logic and consolidate changes in one central place without changing many class types.
Let me know your thoughts!
|
@abbccdda Can you rebase and update this KIP so we can review? Thanks! |
|
@mjsax Thanks for taking time to review the change! In fact, I would like to get the vote completed for KIP-300 before proceeding to work on this since the rebasing effort of this change is really high now and we hope this is the last back-and-forth :). Hey @guozhangwang @vahidhashemian could you take a look at KIP-300 and give me two more binding votes when you got time? Thanks! |
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build.gradle
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this is my local build failure, will revert lol
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the method name changed to windowedTable and windowSize parameter is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
windowSize should be Duration
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/StateStoreType.java
Outdated
Show resolved
Hide resolved
|
@mjsax Thanks so much for the review Matthias! So sorry I'm currently clearing some backlog tasks at my work and couldn't get chance to react this one. I should be able to start making progress late next week. |
|
Cool. We are not in a hurry. Just wanted to make sure to unblock you. |
|
Closing this stale PR -- the corresponding KIP was never approved and the KIP itself seems abandoned atm. |
We are proposing adding a new API called
windowedTabletoStreamsBuilder.javato bring in the ability of materializing a windowed topic to local. Links to related sources:KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
Jira: https://issues.apache.org/jira/browse/KAFKA-6840