New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12446: Call subtractor before adder if key is the same #10747
Conversation
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just working through the review backlog and re-discovered this PR. Left a few comments. If you are still interested in working on this, happy to help you getting it merged.
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
Outdated
Show resolved
Hide resolved
a4f76bb
to
ad9af2f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR. -- Thinking about it once more, I actually think we need a KIP for this change, because of backward compatibility considerations. Should not be a complex KIP, but I don't feel confident merging this change without an approved KIP.
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
Show resolved
Hide resolved
@fqaiser94 -- are you interested in doing a KIP do move this forward? |
Heya @mjsax, apologies I've been travelling recently so I haven't been able to respond to any of your messages 😓 |
Great! Sure, this works. Looking forward to your KIP -- let me know if you have any question about the KIP process. |
ad9af2f
to
36d215e
Compare
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Outdated
Show resolved
Hide resolved
36d215e
to
b663c27
Compare
b663c27
to
4b85707
Compare
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Show resolved
Hide resolved
4b85707
to
6ca9f44
Compare
abc766c
to
7ff6c86
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated PR based on minor feedback received during KIP-904 discussions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay in reviewing. Last few weeks were very busy. -- Still think we can make the 3.5 feature freeze deadline by next Wednesday.
Overall LGTM, just a couple of minor comments.
One thing: we should add a system test, to test the upgrade path. There is StreamsUpgradeTest.java
and streams_upgrade_test.py
(cf test_rolling_upgrade_with_2_bounces
) as a starting point.
Might be best to do a similar thing as for FK-joins, and add a new test variation -- the tricky thing about the test would be, to ensure that the repartition topic is not empty when we do the bounce, so we should setup the test accordingly. -- If you want, we can also push this PR over the finish line, and add the system test in a follow-up PR.
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
Show resolved
Hide resolved
streams/src/test/java/org/apache/kafka/streams/kstream/internals/ChangedSerdeTest.java
Outdated
Show resolved
Hide resolved
I've started investigating this but it's unlikely that I will have this done before next Wednesday as I have an extremely busy next 2 weeks and this is looking a bit tricky (can't even get the existing system tests to pass for some reason currently). |
Thanks for the quick turn around time! Merged. We can still do a follow up on the open question with regard to the serialization format. Using Hope we can get a system test earlier than later. If you are too busy, maybe somebody else can pick it up. In any case: Thanks for the KIP and the PR! |
Follow up PR to user varint encoding: #13533 |
Thanks for your support with this PR/KIP!
Yea, sorry, I wont have the time to figure this out for at least the next couple of weeks :/ |
For full details, please see KIP-904.
Problem
During
KTable.groupBy
, we write into a repartition topic. Since the grouping key can change, we need to send separate events for the oldValue and the newValue to downstream nodes (where they will be “subtracted” and “added” respectively from/to the aggregate for the old key and the new key respectively).However, sending the oldValue and the newValue as separate events is not strictly necessary when the grouping key does not change and doing so poses two challenges for users:
KTable.groupBy(???).aggregate(???)
) can briefly be in an “inconsistent” state where the oldValue has been “subtracted” from the aggregate for the key but the newValue has not yet been “added” to the aggregate of the key because each event (oldValue, newValue) is processed separately.send()
, it’s possible the newValue may be sent (and therefore processed by the aggregator) before the oldValue. If the user’sadder
andsubtractor
functions are non-commutative, this would put the aggregate in a permanently “inconsistent” state.Whilst there are ways to get around this issue by dropping down to the Processor API level, it would be nicer if this was handled by Kafka Streams more seamlessly.
Solution
If the grouping key has not changed, the oldValue and newValue events are guaranteed to be processed by the same processor. As such, we should be able to send them as a single
Change<T>
event. The subtractor and adder functions can then be executed (in that order) and the KTable can be updated in a single “atomic” operation. In this way, we are able to remove any possibility of ending up in an “inconsistent” state. Also, note that sending the oldValue and newValue in the same event ensures that they can’t be re-ordered relative to each other irrespective of how a user has configured the producer settings.Committer Checklist (excluded from commit message)