diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 556e7cd509fbc..23b731fecb5b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -456,7 +456,7 @@ public RocksDBRangeIterator(RocksIterator iter, Serdes serdes, @Override public boolean hasNext() { - return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0; + return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index d6baf305532de..581b742ff7250 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -189,6 +189,7 @@ public void flush() { @Override public void close() { + flush(); for (Segment segment : segments) { if (segment != null) segment.close(); @@ -271,7 +272,7 @@ public WindowStoreIterator fetch(K key, long timeFrom, long timeTo) { long segTo = segmentId(Math.max(0L, timeTo)); byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes); - byte[] binaryUntil = WindowStoreUtils.toBinaryKey(key, timeTo + 1L, 0, serdes); + byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes); ArrayList> iterators = new ArrayList<>(); @@ -279,7 +280,7 @@ public WindowStoreIterator fetch(K key, long timeFrom, long timeTo) { Segment segment = segments[(int) (segmentId % segments.length)]; if (segment != null && segment.id == segmentId) - iterators.add(segment.range(binaryFrom, binaryUntil)); + iterators.add(segment.range(binaryFrom, binaryTo)); } if (iterators.size() > 0) {