From 9628c1278e5dcd4f6995c89809461d350d9a530a Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 14 Sep 2021 05:40:54 +0800 Subject: [PATCH] KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order (#11292) When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it. Currently, in Window store, we store records in [segments -> [records] ]. For example: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window So, internally, the "a" and "b" will be in the same segment, and "c" in another segments. segments: [0 /* window start */, records], [500, records]. And the records for window start 0 will be "a" and "b". the records for window start 500 will be "c". Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously. Reviewers: Jorge Esteban Quilcate Otoya , Anna Sophie Blee-Goldman , Guozhang Wang --- .../state/internals/InMemoryWindowStore.java | 10 +- .../AbstractWindowBytesStoreTest.java | 500 ++++++++---------- .../state/internals/WindowStoreFetchTest.java | 229 ++++++++ 3 files changed, 451 insertions(+), 288 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 7c0d21c7c7d1..ae37542ba4bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -499,10 +499,14 @@ Iterator> setRecordIterator() { final Map.Entry> currentSegment = segmentIterator.next(); currentTime = currentSegment.getKey(); - if (allKeys) { - return currentSegment.getValue().entrySet().iterator(); + final ConcurrentNavigableMap subMap = allKeys ? + currentSegment.getValue() : + currentSegment.getValue().subMap(keyFrom, true, keyTo, true); + + if (forward) { + return subMap.entrySet().iterator(); } else { - return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator(); + return subMap.descendingMap().entrySet().iterator(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 02db7e74a814..153db976c997 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -80,6 +80,15 @@ public abstract class AbstractWindowBytesStoreTest { static final long SEGMENT_INTERVAL = 60_000L; static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL; + final long defaultStartTime = SEGMENT_INTERVAL - 4L; + + final KeyValue, String> zero = windowedPair(0, "zero", defaultStartTime); + final KeyValue, String> one = windowedPair(1, "one", defaultStartTime + 1); + final KeyValue, String> two = windowedPair(2, "two", defaultStartTime + 2); + final KeyValue, String> three = windowedPair(3, "three", defaultStartTime + 2); + final KeyValue, String> four = windowedPair(4, "four", defaultStartTime + 4); + final KeyValue, String> five = windowedPair(5, "five", defaultStartTime + 5); + WindowStore windowStore; InternalMockProcessorContext context; MockRecordCollector recordCollector; @@ -119,122 +128,114 @@ public void after() { @Test public void testRangeAndSinglePointFetch() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); - - assertEquals("zero", windowStore.fetch(0, startTime)); - assertEquals("one", windowStore.fetch(1, startTime + 1L)); - assertEquals("two", windowStore.fetch(2, startTime + 2L)); - assertEquals("four", windowStore.fetch(4, startTime + 4L)); - assertEquals("five", windowStore.fetch(5, startTime + 5L)); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( new HashSet<>(Collections.singletonList("zero")), valuesToSet(windowStore.fetch( 0, - ofEpochMilli(startTime + 0 - WINDOW_SIZE), - ofEpochMilli(startTime + 0 + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 0 - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0 + WINDOW_SIZE)))); - putSecondBatch(windowStore, startTime, context); + putSecondBatch(windowStore, defaultStartTime, context); - assertEquals("two+1", windowStore.fetch(2, startTime + 3L)); - assertEquals("two+2", windowStore.fetch(2, startTime + 4L)); - assertEquals("two+3", windowStore.fetch(2, startTime + 5L)); - assertEquals("two+4", windowStore.fetch(2, startTime + 6L)); - assertEquals("two+5", windowStore.fetch(2, startTime + 7L)); - assertEquals("two+6", windowStore.fetch(2, startTime + 8L)); + assertEquals("two+1", windowStore.fetch(2, defaultStartTime + 3L)); + assertEquals("two+2", windowStore.fetch(2, defaultStartTime + 4L)); + assertEquals("two+3", windowStore.fetch(2, defaultStartTime + 5L)); + assertEquals("two+4", windowStore.fetch(2, defaultStartTime + 6L)); + assertEquals("two+5", windowStore.fetch(2, defaultStartTime + 7L)); + assertEquals("two+6", windowStore.fetch(2, defaultStartTime + 8L)); assertEquals( new HashSet<>(Collections.emptyList()), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime - 2L - WINDOW_SIZE), - ofEpochMilli(startTime - 2L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime - 2L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime - 2L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("two")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime - 1L - WINDOW_SIZE), - ofEpochMilli(startTime - 1L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime - 1L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime - 1L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime - WINDOW_SIZE), - ofEpochMilli(startTime + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 1L - WINDOW_SIZE), - ofEpochMilli(startTime + 1L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2", "two+3")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 2L - WINDOW_SIZE), - ofEpochMilli(startTime + 2L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 3L - WINDOW_SIZE), - ofEpochMilli(startTime + 3L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 4L - WINDOW_SIZE), - ofEpochMilli(startTime + 4L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 5L - WINDOW_SIZE), - ofEpochMilli(startTime + 5L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 6L - WINDOW_SIZE), - ofEpochMilli(startTime + 6L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 6L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 6L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+2", "two+3", "two+4", "two+5", "two+6")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 7L - WINDOW_SIZE), - ofEpochMilli(startTime + 7L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 7L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 7L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 8L - WINDOW_SIZE), - ofEpochMilli(startTime + 8L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 8L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 8L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+4", "two+5", "two+6")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 9L - WINDOW_SIZE), - ofEpochMilli(startTime + 9L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 9L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 9L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+5", "two+6")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 10L - WINDOW_SIZE), - ofEpochMilli(startTime + 10L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 10L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 10L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("two+6")), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 11L - WINDOW_SIZE), - ofEpochMilli(startTime + 11L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 11L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 11L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.emptyList()), valuesToSet(windowStore.fetch( 2, - ofEpochMilli(startTime + 12L - WINDOW_SIZE), - ofEpochMilli(startTime + 12L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE)))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -244,14 +245,14 @@ public void testRangeAndSinglePointFetch() { changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value())); } - final Map> entriesByKey = entriesByKey(changeLog, startTime); + final Map> entriesByKey = entriesByKey(changeLog, defaultStartTime); assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); assertEquals( Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3)); assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); assertNull(entriesByKey.get(6)); @@ -259,42 +260,28 @@ public void testRangeAndSinglePointFetch() { @Test public void shouldGetAll() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); - - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); - final KeyValue, String> five = windowedPair(5, "five", startTime + 5); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( - asList(zero, one, two, four, five), + asList(zero, one, two, three, four, five), toList(windowStore.all()) ); } @Test public void shouldGetAllNonDeletedRecords() { - final long startTime = SEGMENT_INTERVAL - 4L; - // Add some records - windowStore.put(0, "zero", startTime + 0); - windowStore.put(1, "one", startTime + 1); - windowStore.put(2, "two", startTime + 2); - windowStore.put(3, "three", startTime + 3); - windowStore.put(4, "four", startTime + 4); + windowStore.put(0, "zero", defaultStartTime + 0); + windowStore.put(1, "one", defaultStartTime + 1); + windowStore.put(2, "two", defaultStartTime + 2); + windowStore.put(3, "three", defaultStartTime + 3); + windowStore.put(4, "four", defaultStartTime + 4); // Delete some records - windowStore.put(1, null, startTime + 1); - windowStore.put(3, null, startTime + 3); + windowStore.put(1, null, defaultStartTime + 1); + windowStore.put(3, null, defaultStartTime + 3); // Only non-deleted records should appear in the all() iterator - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); - assertEquals( asList(zero, two, four), toList(windowStore.all()) @@ -303,21 +290,15 @@ public void shouldGetAllNonDeletedRecords() { @Test public void shouldGetAllReturnTimestampOrderedRecords() { - final long startTime = SEGMENT_INTERVAL - 4L; - // Add some records in different order - windowStore.put(4, "four", startTime + 4); - windowStore.put(0, "zero", startTime + 0); - windowStore.put(2, "two", startTime + 2); - windowStore.put(3, "three", startTime + 3); - windowStore.put(1, "one", startTime + 1); + windowStore.put(4, "four", defaultStartTime + 4); + windowStore.put(0, "zero", defaultStartTime + 0); + windowStore.put(2, "two", defaultStartTime + 2); + windowStore.put(3, "three", defaultStartTime + 3); + windowStore.put(1, "one", defaultStartTime + 1); // Only non-deleted records should appear in the all() iterator - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> three = windowedPair(3, "three", startTime + 3); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); + final KeyValue, String> three = windowedPair(3, "three", defaultStartTime + 3); assertEquals( asList(zero, one, two, three, four), @@ -327,13 +308,8 @@ public void shouldGetAllReturnTimestampOrderedRecords() { @Test public void shouldEarlyClosedIteratorStillGetAllRecords() { - final long startTime = SEGMENT_INTERVAL - 4L; - - windowStore.put(0, "zero", startTime + 0); - windowStore.put(1, "one", startTime + 1); - - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); + windowStore.put(0, "zero", defaultStartTime + 0); + windowStore.put(1, "one", defaultStartTime + 1); final KeyValueIterator, String> it = windowStore.all(); assertEquals(zero, it.next()); @@ -348,302 +324,260 @@ public void shouldEarlyClosedIteratorStillGetAllRecords() { @Test public void shouldGetBackwardAll() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); - - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); - final KeyValue, String> five = windowedPair(5, "five", startTime + 5); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( - asList(five, four, two, one, zero), + asList(five, four, three, two, one, zero), toList(windowStore.backwardAll()) ); } @Test public void shouldFetchAllInTimeRange() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); - - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); - final KeyValue, String> five = windowedPair(5, "five", startTime + 5); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( - asList(one, two, four), - toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4))) + asList(one, two, three, four), + toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 4))) ); assertEquals( - asList(zero, one, two), - toList(windowStore.fetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3))) + asList(zero, one, two, three), + toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3))) ); assertEquals( - asList(one, two, four, five), - toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5))) + asList(one, two, three, four, five), + toList(windowStore.fetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 5))) ); } @Test public void shouldBackwardFetchAllInTimeRange() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); - - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); - final KeyValue, String> five = windowedPair(5, "five", startTime + 5); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( - asList(four, two, one), - toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4))) + asList(four, three, two, one), + toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 4))) ); assertEquals( - asList(two, one, zero), - toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3))) + asList(three, two, one, zero), + toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 0), ofEpochMilli(defaultStartTime + 3))) ); assertEquals( - asList(five, four, two, one), - toList(windowStore.backwardFetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5))) + asList(five, four, three, two, one), + toList(windowStore.backwardFetchAll(ofEpochMilli(defaultStartTime + 1), ofEpochMilli(defaultStartTime + 5))) ); } @Test public void testFetchRange() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); - - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); - final KeyValue, String> five = windowedPair(5, "five", startTime + 5); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( asList(zero, one), toList(windowStore.fetch( 0, 1, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( Collections.singletonList(one), toList(windowStore.fetch( 1, 1, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( - asList(one, two), + asList(one, two, three), toList(windowStore.fetch( 1, 3, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( - asList(zero, one, two), + asList(zero, one, two, three), toList(windowStore.fetch( 0, 5, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( - asList(zero, one, two, four, five), + asList(zero, one, two, three, four, five), toList(windowStore.fetch( 0, 5, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L))) ); assertEquals( - asList(two, four, five), + asList(two, three, four, five), toList(windowStore.fetch( 0, 5, - ofEpochMilli(startTime + 2L), - ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L))) + ofEpochMilli(defaultStartTime + 2L), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L))) ); assertEquals( Collections.emptyList(), toList(windowStore.fetch( 4, 5, - ofEpochMilli(startTime + 2L), - ofEpochMilli(startTime + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 2L), + ofEpochMilli(defaultStartTime + WINDOW_SIZE))) ); assertEquals( Collections.emptyList(), toList(windowStore.fetch( 0, 3, - ofEpochMilli(startTime + 3L), - ofEpochMilli(startTime + WINDOW_SIZE + 5))) + ofEpochMilli(defaultStartTime + 3L), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5))) ); } @Test public void testBackwardFetchRange() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); - - final KeyValue, String> zero = windowedPair(0, "zero", startTime + 0); - final KeyValue, String> one = windowedPair(1, "one", startTime + 1); - final KeyValue, String> two = windowedPair(2, "two", startTime + 2); - final KeyValue, String> four = windowedPair(4, "four", startTime + 4); - final KeyValue, String> five = windowedPair(5, "five", startTime + 5); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( asList(one, zero), toList(windowStore.backwardFetch( 0, 1, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( Collections.singletonList(one), toList(windowStore.backwardFetch( 1, 1, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( - asList(two, one), + asList(three, two, one), toList(windowStore.backwardFetch( 1, 3, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( - asList(two, one, zero), + asList(three, two, one, zero), toList(windowStore.backwardFetch( 0, 5, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE))) ); assertEquals( - asList(five, four, two, one, zero), + asList(five, four, three, two, one, zero), toList(windowStore.backwardFetch( 0, 5, - ofEpochMilli(startTime + 0L - WINDOW_SIZE), - ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L))) + ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L))) ); assertEquals( - asList(five, four, two), + asList(five, four, three, two), toList(windowStore.backwardFetch( 0, 5, - ofEpochMilli(startTime + 2L), - ofEpochMilli(startTime + 0L + WINDOW_SIZE + 5L))) + ofEpochMilli(defaultStartTime + 2L), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE + 5L))) ); assertEquals( Collections.emptyList(), toList(windowStore.backwardFetch( 4, 5, - ofEpochMilli(startTime + 2L), - ofEpochMilli(startTime + WINDOW_SIZE))) + ofEpochMilli(defaultStartTime + 2L), + ofEpochMilli(defaultStartTime + WINDOW_SIZE))) ); assertEquals( Collections.emptyList(), toList(windowStore.backwardFetch( 0, 3, - ofEpochMilli(startTime + 3L), - ofEpochMilli(startTime + WINDOW_SIZE + 5))) + ofEpochMilli(defaultStartTime + 3L), + ofEpochMilli(defaultStartTime + WINDOW_SIZE + 5))) ); } @Test public void testPutAndFetchBefore() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( new HashSet<>(Collections.singletonList("zero")), - valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime + 0L - WINDOW_SIZE), ofEpochMilli(startTime + 0L)))); + valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 0L)))); assertEquals( new HashSet<>(Collections.singletonList("one")), - valuesToSet(windowStore.fetch(1, ofEpochMilli(startTime + 1L - WINDOW_SIZE), ofEpochMilli(startTime + 1L)))); + valuesToSet(windowStore.fetch(1, ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 1L)))); assertEquals( new HashSet<>(Collections.singletonList("two")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L - WINDOW_SIZE), ofEpochMilli(startTime + 2L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 2L)))); assertEquals( - new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(3, ofEpochMilli(startTime + 3L - WINDOW_SIZE), ofEpochMilli(startTime + 3L)))); + new HashSet<>(Collections.singletonList("three")), + valuesToSet(windowStore.fetch(3, ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 3L)))); assertEquals( new HashSet<>(Collections.singletonList("four")), - valuesToSet(windowStore.fetch(4, ofEpochMilli(startTime + 4L - WINDOW_SIZE), ofEpochMilli(startTime + 4L)))); + valuesToSet(windowStore.fetch(4, ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L)))); assertEquals( new HashSet<>(Collections.singletonList("five")), - valuesToSet(windowStore.fetch(5, ofEpochMilli(startTime + 5L - WINDOW_SIZE), ofEpochMilli(startTime + 5L)))); + valuesToSet(windowStore.fetch(5, ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 5L)))); - putSecondBatch(windowStore, startTime, context); + putSecondBatch(windowStore, defaultStartTime, context); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 1L - WINDOW_SIZE), ofEpochMilli(startTime - 1L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime - 1L)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 0L - WINDOW_SIZE), ofEpochMilli(startTime + 0L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 0L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 0L)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 1L - WINDOW_SIZE), ofEpochMilli(startTime + 1L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 1L)))); assertEquals( new HashSet<>(Collections.singletonList("two")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L - WINDOW_SIZE), ofEpochMilli(startTime + 2L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 2L)))); assertEquals( new HashSet<>(asList("two", "two+1")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 3L - WINDOW_SIZE), ofEpochMilli(startTime + 3L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 3L)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 4L - WINDOW_SIZE), ofEpochMilli(startTime + 4L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2", "two+3")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 5L - WINDOW_SIZE), ofEpochMilli(startTime + 5L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 5L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 5L)))); assertEquals( new HashSet<>(asList("two+1", "two+2", "two+3", "two+4")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 6L - WINDOW_SIZE), ofEpochMilli(startTime + 6L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 6L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 6L)))); assertEquals( new HashSet<>(asList("two+2", "two+3", "two+4", "two+5")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 7L - WINDOW_SIZE), ofEpochMilli(startTime + 7L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 7L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 7L)))); assertEquals( new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 8L - WINDOW_SIZE), ofEpochMilli(startTime + 8L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 8L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 8L)))); assertEquals( new HashSet<>(asList("two+4", "two+5", "two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 9L - WINDOW_SIZE), ofEpochMilli(startTime + 9L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 9L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 9L)))); assertEquals( new HashSet<>(asList("two+5", "two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 10L - WINDOW_SIZE), ofEpochMilli(startTime + 10L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 10L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 10L)))); assertEquals( new HashSet<>(Collections.singletonList("two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 11L - WINDOW_SIZE), ofEpochMilli(startTime + 11L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 11L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 11L)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 12L - WINDOW_SIZE), ofEpochMilli(startTime + 12L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 12L)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 13L - WINDOW_SIZE), ofEpochMilli(startTime + 13L)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 13L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 13L)))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -653,11 +587,11 @@ public void testPutAndFetchBefore() { changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value())); } - final Map> entriesByKey = entriesByKey(changeLog, startTime); + final Map> entriesByKey = entriesByKey(changeLog, defaultStartTime); assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3)); assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); assertNull(entriesByKey.get(6)); @@ -665,97 +599,95 @@ public void testPutAndFetchBefore() { @Test public void testPutAndFetchAfter() { - final long startTime = SEGMENT_INTERVAL - 4L; - - putFirstBatch(windowStore, startTime, context); + putFirstBatch(windowStore, defaultStartTime, context); assertEquals( new HashSet<>(Collections.singletonList("zero")), - valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime + 0L), - ofEpochMilli(startTime + 0L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime + 0L), + ofEpochMilli(defaultStartTime + 0L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("one")), - valuesToSet(windowStore.fetch(1, ofEpochMilli(startTime + 1L), - ofEpochMilli(startTime + 1L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(1, ofEpochMilli(defaultStartTime + 1L), + ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("two")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L), - ofEpochMilli(startTime + 2L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L), + ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(3, ofEpochMilli(startTime + 3L), - ofEpochMilli(startTime + 3L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(3, ofEpochMilli(defaultStartTime + 3L), + ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("four")), - valuesToSet(windowStore.fetch(4, ofEpochMilli(startTime + 4L), - ofEpochMilli(startTime + 4L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(4, ofEpochMilli(defaultStartTime + 4L), + ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("five")), - valuesToSet(windowStore.fetch(5, ofEpochMilli(startTime + 5L), - ofEpochMilli(startTime + 5L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(5, ofEpochMilli(defaultStartTime + 5L), + ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE)))); - putSecondBatch(windowStore, startTime, context); + putSecondBatch(windowStore, defaultStartTime, context); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 2L), - ofEpochMilli(startTime - 2L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 2L), + ofEpochMilli(defaultStartTime - 2L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("two")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime - 1L), - ofEpochMilli(startTime - 1L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime - 1L), + ofEpochMilli(defaultStartTime - 1L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1")), valuesToSet(windowStore - .fetch(2, ofEpochMilli(startTime), ofEpochMilli(startTime + WINDOW_SIZE)))); + .fetch(2, ofEpochMilli(defaultStartTime), ofEpochMilli(defaultStartTime + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 1L), - ofEpochMilli(startTime + 1L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 1L), + ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two", "two+1", "two+2", "two+3")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 2L), - ofEpochMilli(startTime + 2L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 2L), + ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+1", "two+2", "two+3", "two+4")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 3L), - ofEpochMilli(startTime + 3L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 3L), + ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+2", "two+3", "two+4", "two+5")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 4L), - ofEpochMilli(startTime + 4L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 4L), + ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+3", "two+4", "two+5", "two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 5L), - ofEpochMilli(startTime + 5L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 5L), + ofEpochMilli(defaultStartTime + 5L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+4", "two+5", "two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 6L), - ofEpochMilli(startTime + 6L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 6L), + ofEpochMilli(defaultStartTime + 6L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("two+5", "two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 7L), - ofEpochMilli(startTime + 7L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 7L), + ofEpochMilli(defaultStartTime + 7L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.singletonList("two+6")), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 8L), - ofEpochMilli(startTime + 8L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 8L), + ofEpochMilli(defaultStartTime + 8L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 9L), - ofEpochMilli(startTime + 9L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 9L), + ofEpochMilli(defaultStartTime + 9L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 10L), - ofEpochMilli(startTime + 10L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 10L), + ofEpochMilli(defaultStartTime + 10L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 11L), - ofEpochMilli(startTime + 11L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 11L), + ofEpochMilli(defaultStartTime + 11L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.emptyList()), - valuesToSet(windowStore.fetch(2, ofEpochMilli(startTime + 12L), - ofEpochMilli(startTime + 12L + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L), + ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE)))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -765,14 +697,14 @@ public void testPutAndFetchAfter() { changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value())); } - final Map> entriesByKey = entriesByKey(changeLog, startTime); + final Map> entriesByKey = entriesByKey(changeLog, defaultStartTime); assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0)); assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1)); assertEquals( Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2)); - assertNull(entriesByKey.get(3)); + assertEquals(Utils.mkSet("three@2"), entriesByKey.get(3)); assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4)); assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); assertNull(entriesByKey.get(6)); @@ -784,49 +716,47 @@ public void testPutSameKeyTimestamp() { windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String()); windowStore.init((StateStoreContext) context, windowStore); - final long startTime = SEGMENT_INTERVAL - 4L; - - windowStore.put(0, "zero", startTime); + windowStore.put(0, "zero", defaultStartTime); assertEquals( new HashSet<>(Collections.singletonList("zero")), - valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE), - ofEpochMilli(startTime + WINDOW_SIZE)))); + valuesToSet(windowStore.fetch(0, ofEpochMilli(defaultStartTime - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE)))); - windowStore.put(0, "zero", startTime); - windowStore.put(0, "zero+", startTime); - windowStore.put(0, "zero++", startTime); + windowStore.put(0, "zero", defaultStartTime); + windowStore.put(0, "zero+", defaultStartTime); + windowStore.put(0, "zero++", defaultStartTime); assertEquals( new HashSet<>(asList("zero", "zero", "zero+", "zero++")), valuesToSet(windowStore.fetch( 0, - ofEpochMilli(startTime - WINDOW_SIZE), - ofEpochMilli(startTime + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("zero", "zero", "zero+", "zero++")), valuesToSet(windowStore.fetch( 0, - ofEpochMilli(startTime + 1L - WINDOW_SIZE), - ofEpochMilli(startTime + 1L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 1L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 1L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("zero", "zero", "zero+", "zero++")), valuesToSet(windowStore.fetch( 0, - ofEpochMilli(startTime + 2L - WINDOW_SIZE), - ofEpochMilli(startTime + 2L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 2L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 2L + WINDOW_SIZE)))); assertEquals( new HashSet<>(asList("zero", "zero", "zero+", "zero++")), valuesToSet(windowStore.fetch( 0, - ofEpochMilli(startTime + 3L - WINDOW_SIZE), - ofEpochMilli(startTime + 3L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 3L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 3L + WINDOW_SIZE)))); assertEquals( new HashSet<>(Collections.emptyList()), valuesToSet(windowStore.fetch( 0, - ofEpochMilli(startTime + 4L - WINDOW_SIZE), - ofEpochMilli(startTime + 4L + WINDOW_SIZE)))); + ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), + ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE)))); // Flush the store and verify all current entries were properly flushed ... windowStore.flush(); @@ -836,7 +766,7 @@ public void testPutSameKeyTimestamp() { changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value())); } - final Map> entriesByKey = entriesByKey(changeLog, startTime); + final Map> entriesByKey = entriesByKey(changeLog, defaultStartTime); assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0)); } @@ -903,7 +833,6 @@ public void shouldFetchAndIterateOverExactKeys() { @Test public void testDeleteAndUpdate() { - final long currentTime = 0; windowStore.put(1, "one", currentTime); windowStore.put(1, "one v2", currentTime); @@ -1176,6 +1105,7 @@ private void putFirstBatch(final WindowStore store, store.put(0, "zero", startTime); store.put(1, "one", startTime + 1L); store.put(2, "two", startTime + 2L); + store.put(3, "three", startTime + 2L); store.put(4, "four", startTime + 4L); store.put(5, "five", startTime + 5L); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java new file mode 100644 index 000000000000..833ab6a074cc --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreFetchTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +@RunWith(Parameterized.class) +public class WindowStoreFetchTest { + private enum StoreType { InMemory, RocksDB, Timed }; + private static final String STORE_NAME = "store"; + private static final int DATA_SIZE = 5; + private static final long WINDOW_SIZE = 500L; + private static final long RETENTION_MS = 10000L; + + private StoreType storeType; + private boolean enableLogging; + private boolean enableCaching; + private boolean forward; + + private LinkedList, Long>> expectedRecords; + private LinkedList> records; + private Properties streamsConfig; + + private TimeWindowedKStream windowedStream; + + public WindowStoreFetchTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { + this.storeType = storeType; + this.enableLogging = enableLogging; + this.enableCaching = enableCaching; + this.forward = forward; + + this.records = new LinkedList<>(); + this.expectedRecords = new LinkedList<>(); + final int m = DATA_SIZE / 2; + for (int i = 0; i < DATA_SIZE; i++) { + final String key = "key-" + i * 2; + final String value = "val-" + i * 2; + final KeyValue r = new KeyValue<>(key, value); + records.add(r); + records.add(r); + // expected the count of each key is 2 + final long windowStartTime = i < m ? 0 : WINDOW_SIZE; + expectedRecords.add(new KeyValue<>(new Windowed<>(key, new TimeWindow(windowStartTime, windowStartTime + WINDOW_SIZE)), 2L)); + } + } + + @Rule + public TestName testName = new TestName(); + + @Parameterized.Parameters(name = "storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}") + public static Collection data() { + final List types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB, StoreType.Timed); + final List logging = Arrays.asList(true, false); + final List caching = Arrays.asList(true, false); + final List forward = Arrays.asList(true, false); + return buildParameters(types, logging, caching, forward); + } + + @Before + public void setup() { + streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) + )); + } + + @Test + public void testStoreConfig() { + final Materialized> stateStoreConfig = getStoreConfig(storeType, STORE_NAME, enableLogging, enableCaching); + //Create topology: table from input topic + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream stream = builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + stream. + groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE))) + .count(stateStoreConfig) + .toStream() + .to("output"); + + final Topology topology = builder.build(); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) { + //get input topic and stateStore + final TestInputTopic input = driver + .createInputTopic("input", new StringSerializer(), new StringSerializer()); + final WindowStore stateStore = driver.getWindowStore(STORE_NAME); + + //write some data + final int medium = DATA_SIZE / 2 * 2; + for (int i = 0; i < records.size(); i++) { + final KeyValue kv = records.get(i); + final long windowStartTime = i < medium ? 0 : WINDOW_SIZE; + input.pipeInput(kv.key, kv.value, windowStartTime + i); + } + + // query the state store + try (final KeyValueIterator, Long> scanIterator = forward ? + stateStore.fetchAll(0, Long.MAX_VALUE) : + stateStore.backwardFetchAll(0, Long.MAX_VALUE)) { + + final Iterator, Long>> dataIterator = forward ? + expectedRecords.iterator() : + expectedRecords.descendingIterator(); + + TestUtils.checkEquals(scanIterator, dataIterator); + } + } + } + + private static Collection buildParameters(final List... argOptions) { + List result = new LinkedList<>(); + result.add(new Object[0]); + + for (final List argOption : argOptions) { + result = times(result, argOption); + } + + return result; + } + + private static List times(final List left, final List right) { + final List result = new LinkedList<>(); + for (final Object[] args : left) { + for (final Object rightElem : right) { + final Object[] resArgs = new Object[args.length + 1]; + System.arraycopy(args, 0, resArgs, 0, args.length); + resArgs[args.length] = rightElem; + result.add(resArgs); + } + } + return result; + } + + private Materialized> getStoreConfig(final StoreType type, final String name, final boolean cachingEnabled, final boolean loggingEnabled) { + final Supplier createStore = () -> { + if (type == StoreType.InMemory) { + return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.RocksDB) { + return Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else if (type == StoreType.Timed) { + return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } else { + return Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(RETENTION_MS), + Duration.ofMillis(WINDOW_SIZE), + false); + } + }; + + final WindowBytesStoreSupplier stateStoreSupplier = createStore.get(); + final Materialized> stateStoreConfig = Materialized + .as(stateStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long()); + if (cachingEnabled) { + stateStoreConfig.withCachingEnabled(); + } else { + stateStoreConfig.withCachingDisabled(); + } + if (loggingEnabled) { + stateStoreConfig.withLoggingEnabled(new HashMap()); + } else { + stateStoreConfig.withLoggingDisabled(); + } + return stateStoreConfig; + } +}