Skip to content

Commit

Permalink
Update InMemorySessionStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Stuedi committed Dec 31, 2021
1 parent 32ded30 commit fc613ff
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 89 deletions.
Expand Up @@ -330,9 +330,6 @@ public <R> QueryResult<R> query(final Query<R> query, final PositionBound positi
}
}

return null;

/*
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
Expand All @@ -341,8 +338,6 @@ public <R> QueryResult<R> query(final Query<R> query, final PositionBound positi
position,
context.taskId().partition()
);
*/
}

@Override
Expand Down
Expand Up @@ -358,30 +358,6 @@ public boolean isOpen() {
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {

if (query instanceof WindowKeyQuery) {
final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = (WindowKeyQuery<Bytes, byte[]>) query;
if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
final Bytes key = windowKeyQuery.getKey();
final Instant lower = windowKeyQuery.getTimeFrom().get();
final Instant upper = windowKeyQuery.getTimeTo().get();
final WindowStoreIterator<byte[]> iterator = this.fetch(key, lower, upper);
final R result = (R) iterator;
final QueryResult<R> queryResult = QueryResult.forResult(result);
return queryResult;
}
} else if (query instanceof WindowRangeQuery) {
final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
if (windowRangeQuery.getTimeFrom().isPresent() &&
windowRangeQuery.getTimeTo().isPresent()) {
final Instant windowLower = windowRangeQuery.getTimeFrom().get();
final Instant windowUpper = windowRangeQuery.getTimeTo().get();
final KeyValueIterator<Windowed<Bytes>, byte[]> kvIterator = this.fetchAll(windowLower, windowUpper);
final R result = (R) kvIterator;
final QueryResult<R> queryResult = QueryResult.forResult(result);
return queryResult;
}
}

return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
Expand Down
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
Expand All @@ -34,14 +33,11 @@
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
Expand Down Expand Up @@ -402,14 +398,14 @@ private <R> QueryResult<R> runKeyQuery(final Query<R> query,
streamsMetrics,
serdes,
time);
QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult = QueryResult.forResult(typedResult);
final QueryResult<MeteredWindowStoreIterator<V>> typedQueryResult = QueryResult.forResult(typedResult);
queryResult = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no result set.
queryResult = (QueryResult<R>) rawResult;
}
} else {
queryResult = null;
queryResult = QueryResult.forUnknownQueryType(query, this);
}
return queryResult;
}
Expand Down
Expand Up @@ -137,30 +137,6 @@ public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long tim
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {

if (query instanceof WindowKeyQuery) {
final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = (WindowKeyQuery<Bytes, byte[]>) query;
if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
final Bytes key = windowKeyQuery.getKey();
final Instant lower = windowKeyQuery.getTimeFrom().get();
final Instant upper = windowKeyQuery.getTimeTo().get();
final WindowStoreIterator<byte[]> iterator = this.fetch(key, lower, upper);
final R result = (R) iterator;
final QueryResult<R> queryResult = QueryResult.forResult(result);
return queryResult;
}
} else if (query instanceof WindowRangeQuery) {
final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
if (windowRangeQuery.getTimeFrom().isPresent() &&
windowRangeQuery.getTimeTo().isPresent()) {
final Instant windowLower = windowRangeQuery.getTimeFrom().get();
final Instant windowUpper = windowRangeQuery.getTimeTo().get();
final KeyValueIterator<Windowed<Bytes>, byte[]> kvIterator = this.fetchAll(windowLower, windowUpper);
final R result = (R) kvIterator;
final QueryResult<R> queryResult = QueryResult.forResult(result);
return queryResult;
}
}

return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
Expand All @@ -27,11 +28,17 @@
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -65,6 +72,14 @@ QueryResult<?> apply(
mkEntry(
KeyQuery.class,
StoreQueryUtils::runKeyQuery
),
mkEntry(
WindowKeyQuery.class,
StoreQueryUtils::runWindowKeyQuery
),
mkEntry(
WindowRangeQuery.class,
StoreQueryUtils::runWindowRangeQuery
)
);

Expand Down Expand Up @@ -197,6 +212,77 @@ private static <R> QueryResult<R> runKeyQuery(final Query<R> query,
}
}

@SuppressWarnings("unchecked")
private static <R> QueryResult<R> runWindowKeyQuery(final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo,
final StateStore store) {
if (store instanceof WindowStore) {
final WindowKeyQuery<Bytes, byte[]> windowKeyQuery = (WindowKeyQuery<Bytes, byte[]>) query;
final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
try {
if (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) {
final WindowStoreIterator<byte[]> iterator = windowStore.fetch(windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), windowKeyQuery.getTimeTo().get());
return (QueryResult<R>) QueryResult.forResult(iterator);
} else {
return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowKeyQuery requires key and window bounds to be present");
}
} catch(final Exception e){
final String message = parseStoreException(e, store, query);
return QueryResult.forFailure(
FailureReason.STORE_EXCEPTION,
message
);
}
} else {
return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "Support for WindowKeyQuery's is currently restricted to stores of type WindowStore");
}
}

@SuppressWarnings("unchecked")
private static <R> QueryResult<R> runWindowRangeQuery(final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo,
final StateStore store) {
if (store instanceof WindowStore) {
final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
final WindowStore<Bytes, byte[]> windowStore = (WindowStore<Bytes, byte[]>) store;
try {
if (windowRangeQuery.getTimeFrom().isPresent() && windowRangeQuery.getTimeTo().isPresent()) {
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = windowStore.fetchAll(windowRangeQuery.getTimeFrom().get(), windowRangeQuery.getTimeTo().get());
return (QueryResult<R>) QueryResult.forResult(iterator);
} else {
return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery requires window bounds to be present when run against a WindowStore");
}
} catch(final Exception e){
final String message = parseStoreException(e, store, query);
return QueryResult.forFailure(
FailureReason.STORE_EXCEPTION,
message
);
}
} else if (store instanceof SessionStore) {
final WindowRangeQuery<Bytes, byte[]> windowRangeQuery = (WindowRangeQuery<Bytes, byte[]>) query;
final SessionStore<Bytes, byte[]> sessionStore = (SessionStore<Bytes, byte[]>) store;
try {
if (windowRangeQuery.getKey().isPresent()) {
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = sessionStore.fetch(windowRangeQuery.getKey().get());
return (QueryResult<R>) QueryResult.forResult(iterator);
} else {
return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "WindowRangeQuery requires key to be present when run against a SessionStore");
}
} catch(final Exception e){
final String message = parseStoreException(e, store, query);
return QueryResult.forFailure(
FailureReason.STORE_EXCEPTION,
message
);
}
} else {
return QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "Support for WindowRangeQuery's is currently restricted to Window and Session stores");
}
}

private static <R> String parseStoreException(final Exception e, final StateStore store, final Query<R> query) {
final StringWriter stringWriter = new StringWriter();
final PrintWriter printWriter = new PrintWriter(stringWriter);
Expand Down

0 comments on commit fc613ff

Please sign in to comment.