Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13494: WindowKeyQuery and WindowRangeQuery #11567

Merged
merged 6 commits into from Jan 3, 2022

Conversation

patrickstuedi
Copy link
Contributor

No description provided.

@patrickstuedi patrickstuedi force-pushed the iqv2-session-query2 branch 23 times, most recently from fd081af to 9003a77 Compare December 14, 2021 14:44
@patrickstuedi patrickstuedi force-pushed the iqv2-session-query2 branch 5 times, most recently from 1416b91 to 88aa972 Compare December 21, 2021 15:07
Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @patrickstuedi , it looks like your PR is still in progress, so I just gave it a quick pass. I hope this helps!

Comment on lines 322 to 345
if (query instanceof WindowRangeQuery) {
@SuppressWarnings("unchecked") final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
if (windowRangeQuery.getKey().isPresent()) {
final Bytes key = windowRangeQuery.getKey().get();
final KeyValueIterator<Windowed<Bytes>, byte[]> keyValueIterator = this.fetch(key);
@SuppressWarnings("unchecked") final R result = (R) keyValueIterator;
final QueryResult<R> queryResult = QueryResult.forResult(result);
return queryResult;
}
}

return null;

/*
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
collectExecutionInfo,
this,
position,
context.taskId().partition()
query,
positionBound,
collectExecutionInfo,
this,
position,
context.taskId().partition()
);

*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that the commented code is just a temporary state while you're finalizing the PR.

Since executing these queries doesn't require any specialization to the concrete store type, just the WindowStore and SessionStore interfaces, I think you might want to move the logic into StoreQueryUtils, but I've been thinking that, rather than having a universal method to handleBasicQueries on any state store, the code will be cleaner with less type-checks and casts if you instead add a new handleWindowQueries and handleSessionQueries.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at your merged PR and coded the store query routines in a similar spirit. For instance, I follow your practice in the MeteredKeyValue store and applied it to MeteredWindow/Session stores. Similarly for the InMemoryKeyValue store I noticed you handle range queries in place, so I did the same for the InMemoryWindowStore. Generally if you feel we should handle queries in the util class (as some are) then probably we should be consistent across the stores and remove the local handlers and instead delegate to the store utils? Or alternatively pass the per store handlers to the StoreQueryUtils.

Comment on lines 374 to 375
if (windowRangeQuery.getTimeFrom().isPresent() &&
windowRangeQuery.getTimeTo().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this method will return "unknown query" if the window bounds aren't specified. Can we make sure there's a test case for WindowRangeQuery#withKey?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was previously mistaken about this. My prior comment was made with your KIP-763 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-763%3A+Range+queries+with+open+endpoints) in mind.

Upon looking at the WindowStore code and the KIP, I now see that we don't support open ranges on window stores, and that the KIP was only about KeyValue range queries.

I'm also just now catching on that you're using Optional.of there, not Optional.ofNullable, so it's always the case that we either have both bounds and no key or a key with no bounds.

I had a second concern that the window store is not handling the case of a range query with a key and no bounds, but upon examination of the WindowStore, I can see that there is no method to handle that case, so it makes sense to return "unknown query" for it (since the objective at this moment is parity). It might be nice if we additionally explain that the error is because the store can't handle the parameterization rather than the query itself, though.

And finally, just to clarify, I do think we should consolidate on handling the queries in StoreQueryUtils rather than in the stores themselves, so these comments are meant to apply to the handler in that class, and we should just delete the code in this class.

@patrickstuedi patrickstuedi force-pushed the iqv2-session-query2 branch 3 times, most recently from bf4d070 to 05625f7 Compare December 22, 2021 13:19
Patrick Stuedi and others added 4 commits December 25, 2021 22:00
…Query.java

Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
…Query.java

Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
@patrickstuedi patrickstuedi force-pushed the iqv2-session-query2 branch 2 times, most recently from c281005 to b4671ff Compare December 25, 2021 22:12
@vvcephei vvcephei changed the title WindowKeyQuery and WindowRangeQuery KAFKA-13494: WindowKeyQuery and WindowRangeQuery Dec 28, 2021
@vvcephei
Copy link
Contributor

Hey @patrickstuedi , thanks for the last batch of updates. Aside from the build failure, I think these are my last comments:

  1. After scrolling around the PR a few times to try and clarify whether we're handling all the right cases, I'm more convinced now that we really should consolidate all the query handling logic into StoreQueryUtils. I gather from your comment that you feel the same way. Let's just go for it in this PR. The thing you saw in the other PR was just an oversight.
  2. Now that I'm really confronted with the reality of the code, I think it's going to be pretty confusing for people that WindowRangeQuery only works with WindowStore when you do withWindowStartRange(from, to) and it only works on SessionStore when you do withKey(key). I still think it's ok to do that, but I think we ought to add more description to the failure message than just "this store doesn't know how to handle this query". Can you replace the else cases there with QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "...") and a message that more accurately explains what the problem is? It would also be nice to have negative test cases for these to verify the correct messages makes it to the user.

Once those points are addressed, we should be ready to merge!

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @patrickstuedi , I really hope you don't mind, but this PR has been dragging out for a long time, so I thought you might appreciate some help in getting it closed out.

I left a bunch of comments to explain the changes.

import java.time.Instant;
import java.util.Optional;

public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {
@Evolving
public class WindowRangeQuery<K, V> implements Query<KeyValueIterator<Windowed<K>, V>> {

@@ -317,13 +318,25 @@ public boolean isOpen() {
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {

if (query instanceof WindowRangeQuery) {
Copy link
Contributor

@vvcephei vvcephei Dec 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @patrickstuedi , Github won't let me comment on the prior conversation, but thanks for pointing out that oversight in the in-memory key-value store! I went ahead and removed it as well.

Comment on lines 374 to 375
if (windowRangeQuery.getTimeFrom().isPresent() &&
windowRangeQuery.getTimeTo().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was previously mistaken about this. My prior comment was made with your KIP-763 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-763%3A+Range+queries+with+open+endpoints) in mind.

Upon looking at the WindowStore code and the KIP, I now see that we don't support open ranges on window stores, and that the KIP was only about KeyValue range queries.

I'm also just now catching on that you're using Optional.of there, not Optional.ofNullable, so it's always the case that we either have both bounds and no key or a key with no bounds.

I had a second concern that the window store is not handling the case of a range query with a key and no bounds, but upon examination of the WindowStore, I can see that there is no method to handle that case, so it makes sense to return "unknown query" for it (since the objective at this moment is parity). It might be nice if we additionally explain that the error is because the store can't handle the parameterization rather than the query itself, though.

And finally, just to clarify, I do think we should consolidate on handling the queries in StoreQueryUtils rather than in the stores themselves, so these comments are meant to apply to the handler in that class, and we should just delete the code in this class.

@@ -58,7 +58,7 @@
final Query<R> query,
final StateStore store) {

return new FailedQueryResult<>(
return forFailure(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just made it easier to inline the "unknown query" message.

Comment on lines +33 to +35
private WindowKeyQuery(final K key,
final Optional<Instant> timeTo,
final Optional<Instant> timeFrom) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I was fixing stuff anyway, I went ahead and fixed a bunch of formatting issues that I didn't bother mentioning before.

Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart),
extractor,
mkSet(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're specifying the key, we expect only to get back windows with the value for that key. The aggregation we specified is to sum all values for the key, and it comes out to 2 because we only write one value for each key; namely, the value is the same number as the key.

Comment on lines 611 to 636
// miss the window start range
shouldHandleWindowKeyQuery(
2,
Instant.ofEpochMilli(windowStart - 1),
Instant.ofEpochMilli(windowStart - 1),
extractor,
mkSet()
);

// miss the key
shouldHandleWindowKeyQuery(
999,
Instant.ofEpochMilli(windowStart),
Instant.ofEpochMilli(windowStart),
extractor,
mkSet()
);

// miss both
shouldHandleWindowKeyQuery(
999,
Instant.ofEpochMilli(windowStart - 1),
Instant.ofEpochMilli(windowStart - 1),
extractor,
mkSet()
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed like a good idea to check a few other query configurations, but none of them showed any problems.

Comment on lines +856 to +935
try (final WindowStoreIterator<V> iterator = queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
actualValue.add(valueExtactor.apply(iterator.next().value));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These iterators need to be closed or they'll leak resources (it's the same for IQv1 as well).

Comment on lines +951 to +1030
try (final KeyValueIterator<Windowed<Integer>, V> iterator = queryResult.get(partition).getResult()) {
while (iterator.hasNext()) {
actualValue.add((Integer) iterator.next().value);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, looks like @vpapavas and I missed the need to close the iterator before.

);

// Should fail to execute this query on a WindowStore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added some cases to test the specialized failure messages.

@vvcephei
Copy link
Contributor

vvcephei commented Jan 3, 2022

The test failures were unrelated:

    Build / JDK 17 and Scala 2.13 / kafka.controller.ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions()
    Build / JDK 11 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT
    Build / JDK 11 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, Security=PLAINTEXT

@vvcephei vvcephei merged commit b8f1cf1 into apache:trunk Jan 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants