Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order #11292

Merged
merged 4 commits into from Sep 13, 2021

Conversation

showuon
Copy link
Contributor

@showuon showuon commented Sep 2, 2021

When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.

Currently, in Window store, we store records in [segments -> [records] ].

For example:
window size = 500,
input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window

So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".

Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.

So, back to the question: why did the original test cases not catch this issue?
It's because the test input are all in different window start timestamp, which will have different different segments:

 private void putFirstBatch(final WindowStore<Integer, String> store,
                               @SuppressWarnings("SameParameterValue") final long startTime,
                               final InternalMockProcessorContext context) {
        context.setRecordContext(createRecordContext(startTime));
        store.put(0, "zero", startTime);
        store.put(1, "one", startTime + 1L);
        store.put(2, "two", startTime + 2L);
        store.put(3, "three", startTime + 2L);  // <-- this is the new record I added, to test multiple records in the same segment case

        store.put(4, "four", startTime + 4L);
        store.put(5, "five", startTime + 5L);
    }

I added an additional record for AbstractWindowBytesStoreTest test. InWindowStoreFetchTest, we will produce records in timestamp:0, 1, 500, 501, 502, which will be put into window: [0, 500] * 2 and [500, 1000] * 3. And we try to fetch them forward/backward, to see if the results are as expected, i.e.: in reverse order should be 502, 501, 500, 1, 0.

The behavior works as expected in RocksDBWindowStore.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon showuon changed the title [WIP] KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order Sep 2, 2021
@@ -1176,6 +1105,7 @@ private void putFirstBatch(final WindowStore<Integer, String> store,
store.put(0, "zero", startTime);
store.put(1, "one", startTime + 1L);
store.put(2, "two", startTime + 2L);
store.put(3, "three", startTime + 2L);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add 2 records at the same timestamp to test the forward and backward fetch cases.

Comment on lines -502 to +509
if (allKeys) {
return currentSegment.getValue().entrySet().iterator();
final ConcurrentNavigableMap<Bytes, byte[]> subMap = allKeys ?
currentSegment.getValue() :
currentSegment.getValue().subMap(keyFrom, true, keyTo, true);

if (forward) {
return subMap.entrySet().iterator();
} else {
return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator();
return subMap.descendingMap().entrySet().iterator();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, when setting records iterator, we only consider the allKey case, not the forward/backward cases. Fix it.

@showuon
Copy link
Contributor Author

showuon commented Sep 2, 2021

@jeqo @ableegoldman @guozhangwang , please help review this PR. Thank you.

Comment on lines 109 to 112
final List<StoreType> types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed);
final List<Boolean> logging = Arrays.asList(true, false);
final List<Boolean> caching = Arrays.asList(true, false);
final List<Boolean> forward = Arrays.asList(true, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test all kinds of combination.

Copy link
Contributor

@jeqo jeqo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great catch @showuon, thanks!

Love the parametrized integration tests to validate each combination.


private TimeWindowedKStream<String, String> windowedStream;

public WindowStoreFetchIntegrationTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems more like a unit test than an integration (though a particularly thorough one😜 )

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but can you move the parametrized test out of the integration tests? It's not exactly a clear definition, but we usually consider as integration test the more heavy/long-running tests, eg those that start up an actual KafkaStreams, read/write/verify data, etc

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @showuon ! It lgtm. I also feel the same as @ableegoldman that it seems a unit test would suffice, which would take much less time as well :)

final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", defaultStartTime);
final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", defaultStartTime + 1);
final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", defaultStartTime + 2);
final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", defaultStartTime + 2);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be defaultStartTime + 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment, but here, I make it as defaultStartTime + 2 on purpose, to test the case that when window starts time is the same, the forward/backward query API can return the order as expected. I updated the PR description to make it clear. Thank you.

@showuon
Copy link
Contributor Author

showuon commented Sep 8, 2021

@ableegoldman @guozhangwang , thanks for your comments. I agree that it doesn't need to be integration test. I've moved it out from integration test. Thank you.

Failed tests are unrelated.

    Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
    Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
    Build / JDK 11 and Scala 2.13 / kafka.server.DelayedOperationTest.testDelayedFuture()

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @showuon . I still have a few questions in mind just for my own clarification:

  1. Do you know why the original test cases in AbstractWindowBytesStoreTest, like shouldGetBackwardAll and testBackwardFetchRange did not capture this bug? This test class is leveraged by the in-memory stores as well.

  2. Related to 1), what additional coverage does the new WindowStoreFetchTest provides in addition to the above two test cases?

@showuon
Copy link
Contributor Author

showuon commented Sep 9, 2021

@guozhangwang , those are good questions. Let me answer them below:

1. Do you know why the original test cases in AbstractWindowBytesStoreTest, like `shouldGetBackwardAll` and `testBackwardFetchRange` did not capture this bug? This test class is leveraged by the in-memory stores as well.

That's right, those tests also tested in-memory stores, but it didn't test multiple records in the same window cases. Currently, in Window store, we store records in [segments -> [records] ].

For example:
window size = 500,
input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window

So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".

Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.

So, back to the question: why did the original test cases not catch this issue?
It's because the test input are all in different window start timestamp, which will have different different segments:

 private void putFirstBatch(final WindowStore<Integer, String> store,
                               @SuppressWarnings("SameParameterValue") final long startTime,
                               final InternalMockProcessorContext context) {
        context.setRecordContext(createRecordContext(startTime));
        store.put(0, "zero", startTime);
        store.put(1, "one", startTime + 1L);
        store.put(2, "two", startTime + 2L);
        store.put(3, "three", startTime + 2L);  // <-- this is the new record I added, to test multiple records in the same segment case

        store.put(4, "four", startTime + 4L);
        store.put(5, "five", startTime + 5L);
    }
2. Related to 1), what additional coverage does the new `WindowStoreFetchTest` provides in addition to the above two test cases?

I think I've added above. I added an additional record for AbstractWindowBytesStoreTest test. In WindowStoreFetchTest, we will produce records in timestamp:0, 1, 500, 501, 502, which will be put into window: [0, 500] * 2 and [500, 1000] * 3. And we try to fetch them forward/backward, to see if the results are as expected, i.e.: in reverse order should be 502, 501, 500, 1, 0.

The behavior works as expected in RocksDBWindowStore.

Hope that's clear.
I also updated in the PR description.
Thank you.

@guozhangwang
Copy link
Contributor

Hope that's clear.
I also updated in the PR description.
Thank you.

Thanks!

@guozhangwang guozhangwang merged commit 9628c12 into apache:trunk Sep 13, 2021
@guozhangwang
Copy link
Contributor

Merged to trunk.

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…rder (apache#11292)

When introducing backward iterator for WindowStroe in apache#9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.

Currently, in Window store, we store records in [segments -> [records] ].

For example:
window size = 500,
input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window

So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".

Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.

Reviewers: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants