From fc613ff90483f8890fa12bc7525a3d2405038990 Mon Sep 17 00:00:00 2001 From: Patrick Stuedi Date: Wed, 22 Dec 2021 11:44:45 +0100 Subject: [PATCH] Update InMemorySessionStore --- .../state/internals/InMemorySessionStore.java | 5 - .../state/internals/InMemoryWindowStore.java | 24 ---- .../state/internals/MeteredSessionStore.java | 1 - .../state/internals/MeteredWindowStore.java | 8 +- .../state/internals/RocksDBWindowStore.java | 24 ---- .../state/internals/StoreQueryUtils.java | 86 +++++++++++++ .../integration/IQv2StoreIntegrationTest.java | 113 +++++++++++++----- 7 files changed, 172 insertions(+), 89 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index 4a91f50b0613..00dbb0a12d56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -330,9 +330,6 @@ public QueryResult query(final Query query, final PositionBound positi } } - return null; - - /* return StoreQueryUtils.handleBasicQueries( query, positionBound, @@ -341,8 +338,6 @@ public QueryResult query(final Query query, final PositionBound positi position, context.taskId().partition() ); - - */ } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index bd8baa63c488..864918cbed59 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -358,30 +358,6 @@ public boolean isOpen() { public QueryResult query(final Query query, final PositionBound positionBound, final boolean collectExecutionInfo) { - if (query instanceof WindowKeyQuery) { - final WindowKeyQuery windowKeyQuery = (WindowKeyQuery) query; - if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) { - final Bytes key = windowKeyQuery.getKey(); - final Instant lower = windowKeyQuery.getTimeFrom().get(); - final Instant upper = windowKeyQuery.getTimeTo().get(); - final WindowStoreIterator iterator = this.fetch(key, lower, upper); - final R result = (R) iterator; - final QueryResult queryResult = QueryResult.forResult(result); - return queryResult; - } - } else if (query instanceof WindowRangeQuery) { - final WindowRangeQuery windowRangeQuery = (WindowRangeQuery) query; - if (windowRangeQuery.getTimeFrom().isPresent() && - windowRangeQuery.getTimeTo().isPresent()) { - final Instant windowLower = windowRangeQuery.getTimeFrom().get(); - final Instant windowUpper = windowRangeQuery.getTimeTo().get(); - final KeyValueIterator, byte[]> kvIterator = this.fetchAll(windowLower, windowUpper); - final R result = (R) kvIterator; - final QueryResult queryResult = QueryResult.forResult(result); - return queryResult; - } - } - return StoreQueryUtils.handleBasicQueries( query, positionBound, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 40aea0599e66..0bb310d6181e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -36,7 +36,6 @@ import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; -import org.apache.kafka.streams.query.WindowKeyQuery; import org.apache.kafka.streams.query.WindowRangeQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 90c8a86d5d77..ebea7dda439d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; @@ -34,14 +33,11 @@ import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.query.KeyQuery; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; -import org.apache.kafka.streams.query.RangeQuery; import org.apache.kafka.streams.query.WindowKeyQuery; import org.apache.kafka.streams.query.WindowRangeQuery; -import org.apache.kafka.streams.query.internals.InternalQueryResultUtil; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -402,14 +398,14 @@ private QueryResult runKeyQuery(final Query query, streamsMetrics, serdes, time); - QueryResult> typedQueryResult = QueryResult.forResult(typedResult); + final QueryResult> typedQueryResult = QueryResult.forResult(typedResult); queryResult = (QueryResult) typedQueryResult; } else { // the generic type doesn't matter, since failed queries have no result set. queryResult = (QueryResult) rawResult; } } else { - queryResult = null; + queryResult = QueryResult.forUnknownQueryType(query, this); } return queryResult; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 0f1fb57bf43b..024c43ed2ea4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -137,30 +137,6 @@ public KeyValueIterator, byte[]> backwardFetchAll(final long tim public QueryResult query(final Query query, final PositionBound positionBound, final boolean collectExecutionInfo) { - if (query instanceof WindowKeyQuery) { - final WindowKeyQuery windowKeyQuery = (WindowKeyQuery) query; - if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) { - final Bytes key = windowKeyQuery.getKey(); - final Instant lower = windowKeyQuery.getTimeFrom().get(); - final Instant upper = windowKeyQuery.getTimeTo().get(); - final WindowStoreIterator iterator = this.fetch(key, lower, upper); - final R result = (R) iterator; - final QueryResult queryResult = QueryResult.forResult(result); - return queryResult; - } - } else if (query instanceof WindowRangeQuery) { - final WindowRangeQuery windowRangeQuery = (WindowRangeQuery) query; - if (windowRangeQuery.getTimeFrom().isPresent() && - windowRangeQuery.getTimeTo().isPresent()) { - final Instant windowLower = windowRangeQuery.getTimeFrom().get(); - final Instant windowUpper = windowRangeQuery.getTimeTo().get(); - final KeyValueIterator, byte[]> kvIterator = this.fetchAll(windowLower, windowUpper); - final R result = (R) kvIterator; - final QueryResult queryResult = QueryResult.forResult(result); - return queryResult; - } - } - return StoreQueryUtils.handleBasicQueries( query, positionBound, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index e1d20c9e6d37..bb6880b49d73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.api.RecordMetadata; @@ -27,11 +28,17 @@ import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.query.RangeQuery; +import org.apache.kafka.streams.query.WindowKeyQuery; +import org.apache.kafka.streams.query.WindowRangeQuery; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; import java.io.PrintWriter; import java.io.StringWriter; +import java.time.Instant; import java.util.Map; import java.util.Optional; @@ -65,6 +72,14 @@ QueryResult apply( mkEntry( KeyQuery.class, StoreQueryUtils::runKeyQuery + ), + mkEntry( + WindowKeyQuery.class, + StoreQueryUtils::runWindowKeyQuery + ), + mkEntry( + WindowRangeQuery.class, + StoreQueryUtils::runWindowRangeQuery ) ); @@ -197,6 +212,77 @@ private static QueryResult runKeyQuery(final Query query, } } + @SuppressWarnings("unchecked") + private static QueryResult runWindowKeyQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { + if (store instanceof WindowStore) { + final WindowKeyQuery windowKeyQuery = (WindowKeyQuery) query; + final WindowStore windowStore = (WindowStore) store; + try { + if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) { + final WindowStoreIterator iterator = windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), windowKeyQuery.getTimeTo().get()); + return (QueryResult) QueryResult.forResult(iterator); + } else { + return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery requires key and window bounds to be present"); + } + } catch(final Exception e){ + final String message = parseStoreException(e, store, query); + return QueryResult.forFailure( + FailureReason.STORE_EXCEPTION, + message + ); + } + } else { + return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "Support for WindowKeyQuery's is currently restricted to stores of type WindowStore"); + } + } + + @SuppressWarnings("unchecked") + private static QueryResult runWindowRangeQuery(final Query query, + final PositionBound positionBound, + final boolean collectExecutionInfo, + final StateStore store) { + if (store instanceof WindowStore) { + final WindowRangeQuery windowRangeQuery = (WindowRangeQuery) query; + final WindowStore windowStore = (WindowStore) store; + try { + if (windowRangeQuery.getTimeFrom().isPresent() && windowRangeQuery.getTimeTo().isPresent()) { + final KeyValueIterator, byte[]> iterator = windowStore.fetchAll(windowRangeQuery.getTimeFrom().get(), windowRangeQuery.getTimeTo().get()); + return (QueryResult) QueryResult.forResult(iterator); + } else { + return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery requires window bounds to be present when run against a WindowStore"); + } + } catch(final Exception e){ + final String message = parseStoreException(e, store, query); + return QueryResult.forFailure( + FailureReason.STORE_EXCEPTION, + message + ); + } + } else if (store instanceof SessionStore) { + final WindowRangeQuery windowRangeQuery = (WindowRangeQuery) query; + final SessionStore sessionStore = (SessionStore) store; + try { + if (windowRangeQuery.getKey().isPresent()) { + final KeyValueIterator, byte[]> iterator = sessionStore.fetch(windowRangeQuery.getKey().get()); + return (QueryResult) QueryResult.forResult(iterator); + } else { + return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery requires key to be present when run against a SessionStore"); + } + } catch(final Exception e){ + final String message = parseStoreException(e, store, query); + return QueryResult.forFailure( + FailureReason.STORE_EXCEPTION, + message + ); + } + } else { + return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "Support for WindowRangeQuery's is currently restricted to Window and Session stores"); + } + } + private static String parseStoreException(final Exception e, final StateStore store, final Query query) { final StringWriter stringWriter = new StringWriter(); final PrintWriter printWriter = new PrintWriter(stringWriter); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index 9319ed3dd6de..29836c704091 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -263,7 +263,7 @@ public StoreSupplier supplier() { } @Override - public boolean isWindowed(){ + public boolean isWindowed() { return true; } }, @@ -273,6 +273,11 @@ public StoreSupplier supplier() { return Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofDays(1), WINDOW_SIZE, false); } + + @Override + public boolean isWindowed() { + return true; + } }, IN_MEMORY_SESSION { @Override @@ -281,7 +286,7 @@ public StoreSupplier supplier() { } @Override - public boolean isSession(){ + public boolean isSession() { return true; } }, @@ -292,7 +297,7 @@ public StoreSupplier supplier() { } @Override - public boolean isSession(){ + public boolean isSession() { return true; } }; @@ -311,11 +316,11 @@ public boolean keyValue() { return false; } - public boolean isWindowed(){ + public boolean isWindowed() { return false; } - public boolean isSession(){ + public boolean isSession() { return false; } } @@ -531,20 +536,35 @@ public void verifyStore() { } } - if (storeToTest.isWindowed()){ + if (storeToTest.isWindowed()) { if (storeToTest.timestamped()) { final Function, Integer> valueExtractor = ValueAndTimestamp::value; final Instant timeTo = Instant.now(); final Instant timeFrom = timeTo.minusSeconds(60); shouldHandleWindowKeyQueries(2, timeFrom, timeTo, valueExtractor); - //shouldHandleWindowRangeQueries(valueExtractor); + shouldHandleWindowRangeQueries(timeFrom, timeTo, valueExtractor); } else { final Function valueExtractor = Function.identity(); final Instant timeTo = Instant.now(); final Instant timeFrom = timeTo.minusSeconds(60); shouldHandleWindowKeyQueries(2, timeFrom, timeTo, valueExtractor); - //shouldHandleWindowRangeQueries(valueExtractor); + shouldHandleWindowRangeQueries(timeFrom, timeTo, valueExtractor); + } + } + + if (storeToTest.isSession()) { + if (storeToTest.timestamped()) { + final Function, Integer> valueExtractor = + ValueAndTimestamp::value; + final Instant timeTo = Instant.now(); + final Instant timeFrom = timeTo.minusSeconds(60); + shouldHandleSessionKeyQueries(2, valueExtractor); + } else { + final Function valueExtractor = Function.identity(); + final Instant timeTo = Instant.now(); + final Instant timeFrom = timeTo.minusSeconds(60); + shouldHandleSessionKeyQueries(2, valueExtractor); } } } @@ -593,33 +613,21 @@ private void shouldHandleWindowKeyQueries(final Integer key, final Instant t ); } - private void shouldHandleWindowRangeQueries(final Function extractor) { - shouldHandleRangeQuery( - Optional.of(1), - Optional.of(3), - extractor, - mkSet(1, 2, 3) - - ); - shouldHandleRangeQuery( - Optional.of(1), - Optional.empty(), + private void shouldHandleWindowRangeQueries(final Instant timeFrom, final Instant timeTo, final Function extractor) { + shouldHandleWindowRangeQuery( + timeFrom, + timeTo, extractor, mkSet(1, 2, 3) ); - shouldHandleRangeQuery( - Optional.empty(), - Optional.of(1), - extractor, - mkSet(0, 1) + } - ); - shouldHandleRangeQuery( - Optional.empty(), - Optional.empty(), + private void shouldHandleSessionKeyQueries(final Integer key, final Function extractor) { + shouldHandleSessionKeyQuery( + key, extractor, - mkSet(0, 1, 2, 3) + mkSet(1, 2, 3) ); } @@ -862,6 +870,53 @@ public void shouldHandleWindowRangeQuery( } } + public void shouldHandleSessionKeyQuery( + final Integer key, + final Function valueExtactor, + final Set expectedValue) { + + final WindowRangeQuery query = WindowRangeQuery.withKey(key); + + final StateQueryRequest, V>> request = + inStore(STORE_NAME) + .withQuery(query) + .withPartitions(mkSet(0, 1)) + .withPositionBound(PositionBound.at(INPUT_POSITION)); + final StateQueryResult, V>> result = + IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request); + + if (result.getGlobalResult() != null) { + fail("global tables aren't implemented"); + } else { + final Set actualValue = new HashSet<>(); + final Map, V>>> queryResult = result.getPartitionResults(); + for (final int partition : queryResult.keySet()) { + final boolean failure = queryResult.get(partition).isFailure(); + if (failure) { + throw new AssertionError(queryResult.toString()); + } + assertThat(queryResult.get(partition).isSuccess(), is(true)); + + assertThrows( + IllegalArgumentException.class, + queryResult.get(partition)::getFailureReason + ); + assertThrows( + IllegalArgumentException.class, + queryResult.get(partition)::getFailureMessage + ); + + final KeyValueIterator, V> iterator = queryResult.get(partition) + .getResult(); + while (iterator.hasNext()) { + actualValue.add(valueExtactor.apply(iterator.next().value)); + } + assertThat(queryResult.get(partition).getExecutionInfo(), is(empty())); + } + assertThat(actualValue, is(expectedValue)); + } + } + public void shouldCollectExecutionInfo() { final KeyQuery> query = KeyQuery.withKey(1);