From abc3e1c888ceee941d557381a8cb8a7df8af2058 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Tue, 10 Oct 2017 16:40:57 +0200 Subject: [PATCH] [FLINK-7826][QS] Add support for all types of state to the QS Client. --- .../common/state/FoldingStateDescriptor.java | 2 +- .../client/QueryableStateClient.java | 57 +- .../state/ImmutableAggregatingState.java | 71 ++ .../client/state/ImmutableFoldingState.java | 70 ++ .../client/state/ImmutableListState.java | 70 ++ .../client/state/ImmutableMapState.java | 139 ++++ .../client/state/ImmutableReducingState.java | 69 ++ .../client/state/ImmutableState.java | 29 + .../client/state/ImmutableStateBinder.java | 80 +++ .../client/state/ImmutableValueState.java | 69 ++ .../network/AbstractServerHandler.java | 2 +- .../itcases/AbstractQueryableStateITCase.java | 644 ++++++++++++++---- .../state/ImmutableAggregatingStateTest.java | 114 ++++ .../state/ImmutableFoldingStateTest.java | 94 +++ .../state/ImmutableListStateTest.java | 112 +++ .../state/ImmutableMapStateTest.java | 189 +++++ .../state/ImmutableReducingStateTest.java | 84 +++ .../state/ImmutableValueStateTest.java | 70 ++ .../streaming/api/datastream/KeyedStream.java | 6 +- .../api/datastream/QueryableStateStream.java | 28 +- .../streaming/api/scala/KeyedStream.scala | 6 +- 21 files changed, 1817 insertions(+), 188 deletions(-) create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java create mode 100644 flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java index f7609c39bf134..09540477e9941 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java @@ -30,7 +30,7 @@ * {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned * folding state. * - * @param Type of the values folded int othe state + * @param Type of the values folded in the other state * @param Type of the value in the state * * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 005c874c85c4e..70bccf0df5d95 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -21,10 +21,12 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.queryablestate.client.state.ImmutableStateBinder; import org.apache.flink.queryablestate.messages.KvStateRequest; import org.apache.flink.queryablestate.messages.KvStateResponse; import org.apache.flink.queryablestate.network.Client; @@ -141,15 +143,15 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) { * @param key The key we are interested in. * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. + * @return Future holding the immutable {@link State} object containing the result. */ @PublicEvolving - public CompletableFuture getKvState( + public CompletableFuture getKvState( final JobID jobId, final String queryableStateName, final K key, final TypeHint keyTypeHint, - final StateDescriptor stateDescriptor) { + final StateDescriptor stateDescriptor) { Preconditions.checkNotNull(keyTypeHint); @@ -164,15 +166,15 @@ public CompletableFuture getKvState( * @param key The key we are interested in. * @param keyTypeInfo The {@link TypeInformation} of the key. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. + * @return Future holding the immutable {@link State} object containing the result. */ @PublicEvolving - public CompletableFuture getKvState( + public CompletableFuture getKvState( final JobID jobId, final String queryableStateName, final K key, final TypeInformation keyTypeInfo, - final StateDescriptor stateDescriptor) { + final StateDescriptor stateDescriptor) { return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); @@ -187,48 +189,17 @@ public CompletableFuture getKvState( * @param keyTypeInfo The {@link TypeInformation} of the keys. * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. + * @return Future holding the immutable {@link State} object containing the result. */ @PublicEvolving - public CompletableFuture getKvState( + public CompletableFuture getKvState( final JobID jobId, final String queryableStateName, final K key, final N namespace, final TypeInformation keyTypeInfo, final TypeInformation namespaceTypeInfo, - final StateDescriptor stateDescriptor) { - - Preconditions.checkNotNull(stateDescriptor); - - // initialize the value serializer based on the execution config. - stateDescriptor.initializeSerializerUnlessSet(executionConfig); - TypeSerializer stateSerializer = stateDescriptor.getSerializer(); - - return getKvState(jobId, queryableStateName, key, - namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer); - } - - /** - * Returns a future holding the request result. - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key that the state we request is associated with. - * @param namespace The namespace of the state. - * @param keyTypeInfo The {@link TypeInformation} of the keys. - * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. - * @param stateSerializer The {@link TypeSerializer} of the state we want to query. - * @return Future holding the result. - */ - @PublicEvolving - public CompletableFuture getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final N namespace, - final TypeInformation keyTypeInfo, - final TypeInformation namespaceTypeInfo, - final TypeSerializer stateSerializer) { + final StateDescriptor stateDescriptor) { Preconditions.checkNotNull(jobId); Preconditions.checkNotNull(queryableStateName); @@ -237,7 +208,7 @@ public CompletableFuture getKvState( Preconditions.checkNotNull(keyTypeInfo); Preconditions.checkNotNull(namespaceTypeInfo); - Preconditions.checkNotNull(stateSerializer); + Preconditions.checkNotNull(stateDescriptor); TypeSerializer keySerializer = keyTypeInfo.createSerializer(executionConfig); TypeSerializer namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); @@ -253,8 +224,8 @@ public CompletableFuture getKvState( return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( stateResponse -> { try { - return KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer); - } catch (IOException e) { + return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent())); + } catch (Exception e) { throw new FlinkRuntimeException(e); } }); diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java new file mode 100644 index 0000000000000..b853cfc4b8c5a --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link AggregatingState} that does not allow for modifications. + * + *

This is the type of the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link AggregatingStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableAggregatingState extends ImmutableState implements AggregatingState { + + private final OUT value; + + private ImmutableAggregatingState(OUT value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public OUT get() { + return value; + } + + @Override + public void add(Object newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableAggregatingState createState( + final AggregatingStateDescriptor stateDescriptor, + final byte[] serializedValue) throws IOException { + + final ACC accumulator = KvStateSerializer.deserializeValue( + serializedValue, + stateDescriptor.getSerializer()); + + final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator); + return new ImmutableAggregatingState<>(state); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java new file mode 100644 index 0000000000000..a12adaad17619 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link FoldingState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link FoldingStateDescriptor}. + */ +@PublicEvolving +@Deprecated +public final class ImmutableFoldingState extends ImmutableState implements FoldingState { + + private final ACC value; + + private ImmutableFoldingState(ACC value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public ACC get() { + return value; + } + + @Override + public void add(Object newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableFoldingState createState( + final FoldingStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final ACC state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableFoldingState<>(state); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java new file mode 100644 index 0000000000000..841690581236c --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; + +/** + * A read-only {@link ListState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ListStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableListState extends ImmutableState implements ListState { + + private final List listState; + + private ImmutableListState(final List state) { + this.listState = Preconditions.checkNotNull(state); + } + + @Override + public Iterable get() { + return listState; + } + + @Override + public void add(V value) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableListState createState( + final ListStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final List state = KvStateSerializer.deserializeList( + serializedState, + stateDescriptor.getElementSerializer()); + return new ImmutableListState<>(state); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java new file mode 100644 index 0000000000000..c216d5dbcd0c0 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * A read-only {@link MapState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link MapStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableMapState extends ImmutableState implements MapState { + + private final Map state; + + private ImmutableMapState(final Map mapState) { + this.state = Preconditions.checkNotNull(mapState); + } + + @Override + public V get(K key) { + return state.get(key); + } + + @Override + public void put(K key, V value) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void putAll(Map map) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void remove(K key) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public boolean contains(K key) { + return state.containsKey(key); + } + + /** + * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}. + * + * @return A read-only iterable view of all the key-value pairs in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable> entries() { + return Collections.unmodifiableSet(state.entrySet()); + } + + /** + * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}. + * + * @return A read-only iterable view of all the keys in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable keys() { + return Collections.unmodifiableSet(state.keySet()); + } + + /** + * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}. + * + * @return A read-only iterable view of all the values in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable values() { + return Collections.unmodifiableCollection(state.values()); + } + + /** + * Iterates over all the mappings in the state. The iterator cannot + * remove elements. + * + * @return A read-only iterator over all the mappings in the state + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterator> iterator() { + return Collections.unmodifiableSet(state.entrySet()).iterator(); + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableMapState createState( + final MapStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final Map state = KvStateSerializer.deserializeMap( + serializedState, + stateDescriptor.getKeySerializer(), + stateDescriptor.getValueSerializer()); + return new ImmutableMapState<>(state); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java new file mode 100644 index 0000000000000..da08c5375f36b --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link ReducingState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ReducingStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableReducingState extends ImmutableState implements ReducingState { + + private final V value; + + private ImmutableReducingState(V value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public V get() { + return value; + } + + @Override + public void add(V newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableReducingState createState( + final ReducingStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final V state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableReducingState<>(state); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java new file mode 100644 index 0000000000000..863f07ba772de --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +/** + * A base class for the read-only types of state returned + * as results from the Queryable State Client. + */ +abstract class ImmutableState { + + protected static final UnsupportedOperationException MODIFICATION_ATTEMPT_ERROR = + new UnsupportedOperationException("State is read-only. No modifications allowed."); +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java new file mode 100644 index 0000000000000..6ce2787963a30 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateBinder; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.util.Preconditions; + +/** + * A {@link StateBinder} used to deserialize the results returned by the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient}. + * + *

The result is an immutable {@link org.apache.flink.api.common.state.State State} + * object containing the requested result. + */ +public class ImmutableStateBinder implements StateBinder { + + private final byte[] serializedState; + + public ImmutableStateBinder(final byte[] content) { + serializedState = Preconditions.checkNotNull(content); + } + + @Override + public ValueState createValueState(ValueStateDescriptor stateDesc) throws Exception { + return ImmutableValueState.createState(stateDesc, serializedState); + } + + @Override + public ListState createListState(ListStateDescriptor stateDesc) throws Exception { + return ImmutableListState.createState(stateDesc, serializedState); + } + + @Override + public ReducingState createReducingState(ReducingStateDescriptor stateDesc) throws Exception { + return ImmutableReducingState.createState(stateDesc, serializedState); + } + + @Override + public AggregatingState createAggregatingState(AggregatingStateDescriptor stateDesc) throws Exception { + return ImmutableAggregatingState.createState(stateDesc, serializedState); + } + + @Override + public FoldingState createFoldingState(FoldingStateDescriptor stateDesc) throws Exception { + return ImmutableFoldingState.createState(stateDesc, serializedState); + } + + @Override + public MapState createMapState(MapStateDescriptor stateDesc) throws Exception { + return ImmutableMapState.createState(stateDesc, serializedState); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java new file mode 100644 index 0000000000000..7fd6457944b27 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link ValueState} that does not allow for modifications. + * + *

This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ValueStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableValueState extends ImmutableState implements ValueState { + + private final V value; + + private ImmutableValueState(V value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public V value() { + return value; + } + + @Override + public void update(V newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static ImmutableValueState createState( + final ValueStateDescriptor stateDescriptor, + final byte[] serializedState) throws IOException { + + final V state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableValueState<>(state); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java index b9bf671ebc8d8..18a88dacb9ac6 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -121,7 +121,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // Execute actual query async, because it is possibly // blocking (e.g. file I/O). // - // A submission failure is not treated as fatal. todo here if there is a shared resource e.g. registry, then I will have to sync on that. + // A submission failure is not treated as fatal. queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats)); } else { diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java index a096f55fa82bf..69316fa6e1e6c 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java @@ -19,17 +19,27 @@ package org.apache.flink.queryablestate.itcases; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -49,12 +59,18 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.QueryableStateStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -63,13 +79,16 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; @@ -198,7 +217,7 @@ public Integer getKey(Tuple2 value) throws Exception { while (!allNonZero && deadline.hasTimeLeft()) { allNonZero = true; - final List>> futures = new ArrayList<>(numKeys); + final List>>> futures = new ArrayList<>(numKeys); for (int i = 0; i < numKeys; i++) { final int key = i; @@ -210,7 +229,7 @@ public Integer getKey(Tuple2 value) throws Exception { allNonZero = false; } - CompletableFuture> result = getKvStateWithRetries( + CompletableFuture>> result = getKvStateWithRetries( client, jobId, queryName, @@ -221,9 +240,14 @@ public Integer getKey(Tuple2 value) throws Exception { false, executor); - result.thenAccept(res -> { - counts.set(key, res.f1); - assertEquals("Key mismatch", key, res.f0.intValue()); + result.thenAccept(response -> { + try { + Tuple2 res = response.get(); + counts.set(key, res.f1); + assertEquals("Key mismatch", key, res.f0.intValue()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } }); futures.add(result); @@ -406,7 +430,7 @@ public Integer getKey(Tuple2 value) throws Exception { cluster.submitJobDetached(jobGraph); - executeQuery(deadline, client, jobId, "hakuna", valueState, numElements); + executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements); } finally { // Free cluster resources if (jobId != null) { @@ -485,7 +509,7 @@ public Integer getKey(Tuple2 value) throws Exception { cluster.submitJobDetached(jobGraph); - executeQuery(deadline, client, jobId, "hakuna", valueState, expected); + executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected); } finally { // Free cluster resources if (jobId != null) { @@ -501,87 +525,6 @@ public Integer getKey(Tuple2 value) throws Exception { } } - /** - * Retry a query for state for keys between 0 and {@link #maxParallelism} until - * expected equals the value of the result tuple's second field. - */ - private void executeQuery( - final Deadline deadline, - final QueryableStateClient client, - final JobID jobId, - final String queryableStateName, - final StateDescriptor> stateDescriptor, - final long expected) throws Exception { - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture> future = getKvStateWithRetries( - client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - stateDescriptor, - QUERY_RETRY_DELAY, - false, - executor); - - Tuple2 value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } - - /** - * Retry a query for state for keys between 0 and {@link #maxParallelism} until - * expected equals the value of the result tuple's second field. - */ - private void executeQuery( - final Deadline deadline, - final QueryableStateClient client, - final JobID jobId, - final String queryableStateName, - final TypeSerializer> valueSerializer, - final long expected) throws Exception { - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - Future> future = getKvStateWithRetries(client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - valueSerializer, - QUERY_RETRY_DELAY, - false, - executor); - - Tuple2 value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } - /** * Tests simple value state queryable state instance with a default value * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) @@ -647,7 +590,7 @@ public Integer getKey( // Now query int key = 0; - CompletableFuture> future = getKvStateWithRetries( + CompletableFuture>> future = getKvStateWithRetries( client, jobId, queryableState.getQueryableStateName(), @@ -730,8 +673,9 @@ public Integer getKey(Tuple2 value) throws Exception { cluster.submitJobDetached(jobGraph); - executeQuery(deadline, client, jobId, "matata", - queryableState.getValueSerializer(), numElements); + final ValueStateDescriptor> stateDesc = + (ValueStateDescriptor>) queryableState.getStateDescriptor(); + executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements); } finally { // Free cluster resources @@ -809,10 +753,10 @@ public Integer getKey(Tuple2 value) throws Exception { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture future = getKvStateWithRetries( + CompletableFuture, String>> future = getKvStateWithRetries( client, jobId, - queryableState.getQueryableStateName(), + "pumba", key, BasicTypeInfo.INT_TYPE_INFO, foldingState, @@ -820,7 +764,9 @@ public Integer getKey(Tuple2 value) throws Exception { false, executor); - String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + //assertEquals("Key mismatch", key, value.f0.intValue()); if (expected.equals(value)) { success = true; } else { @@ -898,12 +844,150 @@ public Integer getKey(Tuple2 value) throws Exception { cluster.submitJobDetached(jobGraph); - // Wait until job is running + // Now query + long expected = numElements * (numElements + 1L) / 2L; + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture>> future = getKvStateWithRetries( + client, + jobId, + "jungle", + key, + BasicTypeInfo.INT_TYPE_INFO, + reducingState, + QUERY_RETRY_DELAY, + false, + executor); + + Tuple2 value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + assertEquals("Key mismatch", key, value.f0.intValue()); + if (expected == value.f1) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + + client.shutdown(); + } + } + + /** + * Tests simple map state queryable state instance. Each source emits + * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then + * queried. The map state instance sums the values up. The test succeeds + * after each subtask index is queried with result n*(n+1)/2. + */ + @Test + public void testMapState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + final QueryableStateClient client = new QueryableStateClient( + "localhost", + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + final MapStateDescriptor> mapStateDescriptor = new MapStateDescriptor<>( + "timon", + BasicTypeInfo.INT_TYPE_INFO, + source.getType()); + mapStateDescriptor.setQueryable("timon-queryable"); + + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).process(new ProcessFunction, Object>() { + private static final long serialVersionUID = -805125545438296619L; + + private transient MapState> mapState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + mapState = getRuntimeContext().getMapState(mapStateDescriptor); + } + + @Override + public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { + Tuple2 v = mapState.get(value.f0); + if (v == null) { + v = new Tuple2<>(value.f0, 0L); + } + mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1)); + } + }); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); // Now query long expected = numElements * (numElements + 1L) / 2L; - executeQuery(deadline, client, jobId, "jungle", reducingState, expected); + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture>> future = getKvStateWithRetries( + client, + jobId, + "timon-queryable", + key, + BasicTypeInfo.INT_TYPE_INFO, + mapStateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + Tuple2 value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key); + assertEquals("Key mismatch", key, value.f0.intValue()); + if (expected == value.f1) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } } finally { // Free cluster resources if (jobId != null) { @@ -919,6 +1003,227 @@ public Integer getKey(Tuple2 value) throws Exception { } } + /** + * Tests simple list state queryable state instance. Each source emits + * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then + * queried. The list state instance add the values to the list. The test + * succeeds after each subtask index is queried and the list contains + * the correct number of distinct elements. + */ + @Test + public void testListState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + final QueryableStateClient client = new QueryableStateClient( + "localhost", + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + final ListStateDescriptor listStateDescriptor = new ListStateDescriptor( + "list", + BasicTypeInfo.LONG_TYPE_INFO); + listStateDescriptor.setQueryable("list-queryable"); + + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).process(new ProcessFunction, Object>() { + private static final long serialVersionUID = -805125545438296619L; + + private transient ListState listState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + listState = getRuntimeContext().getListState(listStateDescriptor); + } + + @Override + public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { + listState.add(value.f1); + } + }); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + + Map> results = new HashMap<>(); + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture> future = getKvStateWithRetries( + client, + jobId, + "list-queryable", + key, + BasicTypeInfo.INT_TYPE_INFO, + listStateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + Iterable value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + Set res = new HashSet<>(); + for (Long v: value) { + res.add(v); + } + + // the source starts at 0, so +1 + if (res.size() == numElements + 1L) { + success = true; + results.put(key, res); + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + + for (int key = 0; key < maxParallelism; key++) { + Set values = results.get(key); + for (long i = 0L; i <= numElements; i++) { + assertTrue(values.contains(i)); + } + } + + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + + client.shutdown(); + } + } + + @Test + public void testAggregatingState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + final QueryableStateClient client = new QueryableStateClient( + "localhost", + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + final AggregatingStateDescriptor, MutableString, String> aggrStateDescriptor = + new AggregatingStateDescriptor<>( + "aggregates", + new SumAggr(), + MutableString.class); + aggrStateDescriptor.setQueryable("aggr-queryable"); + + source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2 value) throws Exception { + return value.f0; + } + }).transform( + "TestAggregatingOperator", + BasicTypeInfo.STRING_TYPE_INFO, + new AggregatingTestOperator(aggrStateDescriptor) + ); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture, String>> future = getKvStateWithRetries( + client, + jobId, + "aggr-queryable", + key, + BasicTypeInfo.INT_TYPE_INFO, + aggrStateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + if (Long.parseLong(value) == numElements * (numElements + 1L) / 2L) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + + client.shutdown(); + } + } + + ///// Sources/UDFs Used in the Tests ////// + /** * Test source producing (key, 0)..(key, maxValue) with key being the sub * task index. @@ -980,8 +1285,8 @@ public void cancel() { /** * Test source producing (key, 1) tuples with random key in key range (numKeys). */ - protected static class TestKeyRangeSource extends RichParallelSourceFunction> - implements CheckpointListener { + private static class TestKeyRangeSource extends RichParallelSourceFunction> implements CheckpointListener { + private static final long serialVersionUID = -5744725196953582710L; private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong(); @@ -997,7 +1302,7 @@ protected static class TestKeyRangeSource extends RichParallelSourceFunction> ctx) throws Exception { ctx.collect(record); } // mild slow down - Thread.sleep(1); + Thread.sleep(1L); } } @@ -1029,6 +1334,77 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { } } + /** + * An operator that uses {@link AggregatingState}. + * + *

The operator exists for lack of possibility to get an + * {@link AggregatingState} from the {@link org.apache.flink.api.common.functions.RuntimeContext}. + * If this were not the case, we could have a {@link ProcessFunction}. + */ + private static class AggregatingTestOperator + extends AbstractStreamOperator + implements OneInputStreamOperator, String> { + + private static final long serialVersionUID = 1L; + + private final AggregatingStateDescriptor, MutableString, String> stateDescriptor; + private transient AggregatingState, String> state; + + AggregatingTestOperator(AggregatingStateDescriptor, MutableString, String> stateDesc) { + this.stateDescriptor = stateDesc; + } + + @Override + public void open() throws Exception { + super.open(); + this.state = getKeyedStateBackend().getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + stateDescriptor); + } + + @Override + public void processElement(StreamRecord> element) throws Exception { + state.add(element.getValue()); + } + } + + /** + * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumAggr implements AggregateFunction, MutableString, String> { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public MutableString createAccumulator() { + return new MutableString(); + } + + @Override + public void add(Tuple2 value, MutableString accumulator) { + long acc = Long.valueOf(accumulator.value); + acc += value.f1; + accumulator.value = Long.toString(acc); + } + + @Override + public String getResult(MutableString accumulator) { + return accumulator.value; + } + + @Override + public MutableString merge(MutableString a, MutableString b) { + MutableString nValue = new MutableString(); + nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value)); + return nValue; + } + } + + private static final class MutableString { + String value = "0"; + } + /** * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. */ @@ -1058,32 +1434,13 @@ public Tuple2 reduce(Tuple2 value1, Tuple2 Future getKvStateWithRetries( - final QueryableStateClient client, - final JobID jobId, - final String queryName, - final K key, - final TypeInformation keyTypeInfo, - final TypeSerializer valueTypeSerializer, - final Time retryDelay, - final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static CompletableFuture getKvStateWithRetries( + private static CompletableFuture getKvStateWithRetries( final QueryableStateClient client, final JobID jobId, final String queryName, final K key, final TypeInformation keyTypeInfo, - final StateDescriptor stateDescriptor, + final StateDescriptor stateDescriptor, final Time retryDelay, final boolean failForUnknownKeyOrNamespace, final ScheduledExecutor executor) { @@ -1157,4 +1514,45 @@ public static void retryWithDelay( (t, throwable) -> operationResultFuture.cancel(false)); } } + + /** + * Retry a query for state for keys between 0 and {@link #maxParallelism} until + * expected equals the value of the result tuple's second field. + */ + private void executeValueQuery( + final Deadline deadline, + final QueryableStateClient client, + final JobID jobId, + final String queryableStateName, + final ValueStateDescriptor> stateDescriptor, + final long expected) throws Exception { + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture>> future = getKvStateWithRetries( + client, + jobId, + queryableStateName, + key, + BasicTypeInfo.INT_TYPE_INFO, + stateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + Tuple2 value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value(); + + assertEquals("Key mismatch", key, value.f0.intValue()); + if (expected == value.f1) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + } } diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java new file mode 100644 index 0000000000000..69b2f61492b1f --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableAggregatingStateTest}. + */ +public class ImmutableAggregatingStateTest { + + private final AggregatingStateDescriptor aggrStateDesc = + new AggregatingStateDescriptor<>( + "test", + new SumAggr(), + MutableString.class); + + private ImmutableAggregatingState aggrState; + + @Before + public void setUp() throws Exception { + if (!aggrStateDesc.isSerializerInitialized()) { + aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + final MutableString initValue = new MutableString(); + initValue.value = "42"; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out)); + + aggrState = ImmutableAggregatingState.createState( + aggrStateDesc, + out.toByteArray() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + String value = aggrState.get(); + assertEquals("42", value); + + aggrState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + String value = aggrState.get(); + assertEquals("42", value); + + aggrState.clear(); + } + + /** + * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumAggr implements AggregateFunction { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public MutableString createAccumulator() { + return new MutableString(); + } + + @Override + public void add(Long value, MutableString accumulator) { + accumulator.value += ", " + value; + } + + @Override + public String getResult(MutableString accumulator) { + return accumulator.value; + } + + @Override + public MutableString merge(MutableString a, MutableString b) { + MutableString nValue = new MutableString(); + nValue.value = a.value + ", " + b.value; + return nValue; + } + } + + private static final class MutableString { + String value; + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java new file mode 100644 index 0000000000000..d2c95351acd8e --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableFoldingState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableFoldingState}. + */ +public class ImmutableFoldingStateTest { + + private final FoldingStateDescriptor foldingStateDesc = + new FoldingStateDescriptor<>( + "test", + "0", + new SumFold(), + StringSerializer.INSTANCE); + + private ImmutableFoldingState foldingState; + + @Before + public void setUp() throws Exception { + if (!foldingStateDesc.isSerializerInitialized()) { + foldingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + StringSerializer.INSTANCE.serialize("42", new DataOutputViewStreamWrapper(out)); + + foldingState = ImmutableFoldingState.createState( + foldingStateDesc, + out.toByteArray() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + String value = foldingState.get(); + assertEquals("42", value); + + foldingState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + String value = foldingState.get(); + assertEquals("42", value); + + foldingState.clear(); + } + + /** + * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumFold implements FoldFunction { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public String fold(String accumulator, Long value) throws Exception { + long acc = Long.valueOf(accumulator); + acc += value; + return Long.toString(acc); + } + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java new file mode 100644 index 0000000000000..3283295187379 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableListState; +import org.apache.flink.runtime.state.heap.HeapListState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableListState}. + */ +public class ImmutableListStateTest { + + private final ListStateDescriptor listStateDesc = + new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableListState listState; + + @Before + public void setUp() throws Exception { + if (!listStateDesc.isSerializerInitialized()) { + listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + List init = new ArrayList<>(); + init.add(42L); + + byte[] serInit = serializeInitValue(init); + listState = ImmutableListState.createState(listStateDesc, serInit); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + List list = getStateContents(); + assertEquals(1L, list.size()); + + long element = list.get(0); + assertEquals(42L, element); + + listState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + List list = getStateContents(); + assertEquals(1L, list.size()); + + long element = list.get(0); + assertEquals(42L, element); + + listState.clear(); + } + + /** + * Copied from {@link HeapListState#getSerializedValue(Object, Object)}. + */ + private byte[] serializeInitValue(List toSerialize) throws IOException { + TypeSerializer serializer = listStateDesc.getElementSerializer(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); + + // write the same as RocksDB writes lists, with one ',' separator + for (int i = 0; i < toSerialize.size(); i++) { + serializer.serialize(toSerialize.get(i), view); + if (i < toSerialize.size() - 1) { + view.writeByte(','); + } + } + view.flush(); + + return baos.toByteArray(); + } + + private List getStateContents() { + List list = new ArrayList<>(); + for (Long elem: listState.get()) { + list.add(elem); + } + return list; + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java new file mode 100644 index 0000000000000..30a8a50c64b58 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.queryablestate.client.state.ImmutableMapState; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; + +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@link ImmutableMapState}. + */ +public class ImmutableMapStateTest { + + private final MapStateDescriptor mapStateDesc = + new MapStateDescriptor<>( + "test", + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableMapState mapState; + + @Before + public void setUp() throws Exception { + if (!mapStateDesc.isSerializerInitialized()) { + mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + Map initMap = new HashMap<>(); + initMap.put(1L, 5L); + initMap.put(2L, 5L); + + byte[] initSer = KvStateSerializer.serializeMap( + initMap.entrySet(), + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()), + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + + mapState = ImmutableMapState.createState(mapStateDesc, initSer); + } + + @Test(expected = UnsupportedOperationException.class) + public void testPut() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.put(2L, 54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testPutAll() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Map nMap = new HashMap<>(); + nMap.put(1L, 7L); + nMap.put(2L, 7L); + + mapState.putAll(nMap); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.put(2L, 54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testIterator() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator> iterator = mapState.iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testIterable() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterable> iterable = mapState.entries(); + Iterator> iterator = iterable.iterator(); + while (iterator.hasNext()) { + assertEquals(5L, (long) iterator.next().getValue()); + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testKeys() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator iterator = mapState.keys().iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testValues() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator iterator = mapState.values().iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.clear(); + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java new file mode 100644 index 0000000000000..9b1ecf8d4b6b8 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.queryablestate.client.state.ImmutableReducingState; + +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableReducingState}. + */ +public class ImmutableReducingStateTest { + + private final ReducingStateDescriptor reducingStateDesc = + new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableReducingState reduceState; + + @Before + public void setUp() throws Exception { + if (!reducingStateDesc.isSerializerInitialized()) { + reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + reduceState = ImmutableReducingState.createState( + reducingStateDesc, + ByteBuffer.allocate(Long.BYTES).putLong(42L).array() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + long value = reduceState.get(); + assertEquals(42L, value); + + reduceState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + long value = reduceState.get(); + assertEquals(42L, value); + + reduceState.clear(); + } + + /** + * Test {@link ReduceFunction} summing up its two arguments. + */ + private static class SumReduce implements ReduceFunction { + + private static final long serialVersionUID = 6041237513913189144L; + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return value1 + value2; + } + } +} diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java new file mode 100644 index 0000000000000..5f7032d2eab22 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.queryablestate.client.state.ImmutableValueState; + +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableValueState}. + */ +public class ImmutableValueStateTest { + + private final ValueStateDescriptor valueStateDesc = + new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableValueState valueState; + + @Before + public void setUp() throws Exception { + if (!valueStateDesc.isSerializerInitialized()) { + valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + valueState = ImmutableValueState.createState( + valueStateDesc, + ByteBuffer.allocate(Long.BYTES).putLong(42L).array() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + long value = valueState.value(); + assertEquals(42L, value); + + valueState.update(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + long value = valueState.value(); + assertEquals(42L, value); + + valueState.clear(); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 51af430f649c1..ebcd7d57f0652 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -745,7 +745,7 @@ public QueryableStateStream asQueryableState( return new QueryableStateStream<>( queryableStateName, - stateDescriptor.getSerializer(), + stateDescriptor, getKeyType().createSerializer(getExecutionConfig())); } @@ -772,7 +772,7 @@ public QueryableStateStream asQueryableState( return new QueryableStateStream<>( queryableStateName, - stateDescriptor.getSerializer(), + stateDescriptor, getKeyType().createSerializer(getExecutionConfig())); } @@ -796,7 +796,7 @@ public QueryableStateStream asQueryableState( return new QueryableStateStream<>( queryableStateName, - stateDescriptor.getSerializer(), + stateDescriptor, getKeyType().createSerializer(getExecutionConfig())); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java index d0de2ab7f6bc0..7f20fd62d0f61 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.util.Preconditions; @@ -37,23 +38,23 @@ public class QueryableStateStream { /** Key serializer for the state instance. */ private final TypeSerializer keySerializer; - /** Value serializer for the state instance. */ - private final TypeSerializer valueSerializer; + /** State descriptor for the state instance. */ + private final StateDescriptor stateDescriptor; /** * Creates a queryable state stream. * * @param queryableStateName Name under which to publish the queryable state instance - * @param valueSerializer Value serializer for the state instance + * @param stateDescriptor The state descriptor for the state instance * @param keySerializer Key serializer for the state instance */ public QueryableStateStream( String queryableStateName, - TypeSerializer valueSerializer, + StateDescriptor stateDescriptor, TypeSerializer keySerializer) { this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Queryable state name"); - this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "Value serializer"); + this.stateDescriptor = Preconditions.checkNotNull(stateDescriptor, "State Descriptor"); this.keySerializer = Preconditions.checkNotNull(keySerializer, "Key serializer"); } @@ -66,15 +67,6 @@ public String getQueryableStateName() { return queryableStateName; } - /** - * Returns the value serializer for the queryable state instance. - * - * @return Value serializer for the state instance - */ - public TypeSerializer getValueSerializer() { - return valueSerializer; - } - /** * Returns the key serializer for the queryable state instance. * @@ -84,4 +76,12 @@ public TypeSerializer getKeySerializer() { return keySerializer; } + /** + * Returns the state descriptor for the queryable state instance. + * + * @return State descriptor for the state instance + */ + public StateDescriptor getStateDescriptor() { + return stateDescriptor; + } } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index aaeb1ec3eab36..49bdbd9e70c41 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -497,7 +497,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] new QueryableStateStream( queryableStateName, - stateDescriptor.getSerializer, + stateDescriptor, getKeyType.createSerializer(executionConfig)) } @@ -522,7 +522,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] new QueryableStateStream( queryableStateName, - stateDescriptor.getSerializer, + stateDescriptor, getKeyType.createSerializer(executionConfig)) } @@ -546,7 +546,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] new QueryableStateStream( queryableStateName, - stateDescriptor.getSerializer, + stateDescriptor, getKeyType.createSerializer(executionConfig)) }