-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP-617: Allow Kafka Streams State Stores to be iterated backwards #8976
Conversation
7fcb985
to
fe00350
Compare
While testing Composite stores found that stores are iterated sequentially. Should I assume that this sequence is based on time precedence (e.g. an old store finishes and another one starts takes after)? This is relevant to decide changing |
e8fb57e
to
67657a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jeqo , thanks for the PR! Haven't gotten through all of it yet but wanted to leave some initial comments for now. Quick meta-comment: the title of the PR should start with the JIRA ticket number (eg KAFKA-9929: Support reverse iterator on WindowStore
)
Also, it seems like we're defining the to
and from
input such that from < to
is still satisfied (as is true of forward queries). But this means the iterator actually starts at to
and then iterates to from
, which seems a bit unintuitive. Thoughts? I should have asked to clarify the to/from definitions in the KIP discussion, sorry about that.
Regarding the composite stores: the stores that this composite wraps all correspond to different keyspaces/tasks, and we don't currently guarantee any kind of ordering across the different stores on key range queries. AFAICT the ordering is pretty much meaningless at the moment, so I'd say you don't need to worry about it for reverse ranges
By the way, this PR is pretty long. Would it be possible to split this up into three separate PRs, one for each type of store? That might help facilitate review -- we can get all the major questions out of the way on the first PR so then the 2nd and 3rd would be very quick to review & merge
...ms/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
Show resolved
Hide resolved
...in/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/internals/BytesRangeValidator.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java
Show resolved
Hide resolved
} | ||
} | ||
|
||
|
||
|
||
private class CacheIteratorWrapper implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you looked through this class? I think it might need some additional changes for the reverse case. For example in #getNextSegmentIterator
there's a check for currentSegmentId > lastSegmentId
which seems to indicate the end of the iteration. But we'd be iterating over the segments backwards as well, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for some context, this iterator wrapper exists to fix a performance issue in the segmented stores. Due to the byte layout we were ending up iterating over records that weren't actually in the query range when going from one "segment" to another. Only the rocksdb window (and session) stores are segmented in this way, hence the check for wrapped().persistent()
before applying this wrapper. Took me a second to remember what that was all about
Anyways, this wrapper makes sure we only iterate over valid records within a segment, so I'm thinking we might need to mess with the logic to check when we're going from one segment to another since we're going through the segments in reverse order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I have completed the validation on #getNextSegmentIterator
, and also add these to Session store. I haven't seem test cases affected by this tho, will add new one to make sure we are covering this path.
Feel free to start with the WindowStore PR, by the way. I just noticed that was the only store listed in the original ticket and it's probably the most useful for users by far. Not to mention, we already have some plans to use it internally for KIP-450 🙂 |
@ableegoldman great feedback. It makes sense to group changes in 3 different PRs. As there is dependencies between stores, let's start with KeyValue, then Window and finally Session store. I have created #9137, #9138, #9139 to be tackled in that order. PS. I'm considering all the comments you mentioned here. Thanks! |
Awesome, thanks @jeqo ! Let me know when the first PR is ready for review again (or is it ready now?) |
KIP reference implementation. https://cwiki.apache.org/confluence/display/KAFKA/KIP-617%3A+Allow+Kafka+Streams+State+Stores+to+be+iterated+backwards
Committer Checklist (excluded from commit message)