Skip to content

Commit

Permalink
[FLINK-7826][QS] Add support for all types of state to the QS Client.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Oct 18, 2017
1 parent 717a7dc commit abc3e1c
Show file tree
Hide file tree
Showing 21 changed files with 1,817 additions and 188 deletions.
Expand Up @@ -30,7 +30,7 @@
* {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned * {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned
* folding state. * folding state.
* *
* @param <T> Type of the values folded int othe state * @param <T> Type of the values folded in the other state
* @param <ACC> Type of the value in the state * @param <ACC> Type of the value in the state
* *
* @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor} * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor}
Expand Down
Expand Up @@ -21,10 +21,12 @@
import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
import org.apache.flink.queryablestate.messages.KvStateRequest; import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse; import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.Client; import org.apache.flink.queryablestate.network.Client;
Expand Down Expand Up @@ -141,15 +143,15 @@ public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
* @param key The key we are interested in. * @param key The key we are interested in.
* @param keyTypeHint A {@link TypeHint} used to extract the type of the key. * @param keyTypeHint A {@link TypeHint} used to extract the type of the key.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the result. * @return Future holding the immutable {@link State} object containing the result.
*/ */
@PublicEvolving @PublicEvolving
public <K, V> CompletableFuture<V> getKvState( public <K, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId, final JobID jobId,
final String queryableStateName, final String queryableStateName,
final K key, final K key,
final TypeHint<K> keyTypeHint, final TypeHint<K> keyTypeHint,
final StateDescriptor<?, V> stateDescriptor) { final StateDescriptor<S, V> stateDescriptor) {


Preconditions.checkNotNull(keyTypeHint); Preconditions.checkNotNull(keyTypeHint);


Expand All @@ -164,15 +166,15 @@ public <K, V> CompletableFuture<V> getKvState(
* @param key The key we are interested in. * @param key The key we are interested in.
* @param keyTypeInfo The {@link TypeInformation} of the key. * @param keyTypeInfo The {@link TypeInformation} of the key.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the result. * @return Future holding the immutable {@link State} object containing the result.
*/ */
@PublicEvolving @PublicEvolving
public <K, V> CompletableFuture<V> getKvState( public <K, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId, final JobID jobId,
final String queryableStateName, final String queryableStateName,
final K key, final K key,
final TypeInformation<K> keyTypeInfo, final TypeInformation<K> keyTypeInfo,
final StateDescriptor<?, V> stateDescriptor) { final StateDescriptor<S, V> stateDescriptor) {


return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE,
keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
Expand All @@ -187,48 +189,17 @@ public <K, V> CompletableFuture<V> getKvState(
* @param keyTypeInfo The {@link TypeInformation} of the keys. * @param keyTypeInfo The {@link TypeInformation} of the keys.
* @param namespaceTypeInfo The {@link TypeInformation} of the namespace. * @param namespaceTypeInfo The {@link TypeInformation} of the namespace.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the result. * @return Future holding the immutable {@link State} object containing the result.
*/ */
@PublicEvolving @PublicEvolving
public <K, V, N> CompletableFuture<V> getKvState( public <K, N, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId, final JobID jobId,
final String queryableStateName, final String queryableStateName,
final K key, final K key,
final N namespace, final N namespace,
final TypeInformation<K> keyTypeInfo, final TypeInformation<K> keyTypeInfo,
final TypeInformation<N> namespaceTypeInfo, final TypeInformation<N> namespaceTypeInfo,
final StateDescriptor<?, V> stateDescriptor) { final StateDescriptor<S, V> stateDescriptor) {

Preconditions.checkNotNull(stateDescriptor);

// initialize the value serializer based on the execution config.
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
TypeSerializer<V> stateSerializer = stateDescriptor.getSerializer();

return getKvState(jobId, queryableStateName, key,
namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer);
}

/**
* Returns a future holding the request result.
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key that the state we request is associated with.
* @param namespace The namespace of the state.
* @param keyTypeInfo The {@link TypeInformation} of the keys.
* @param namespaceTypeInfo The {@link TypeInformation} of the namespace.
* @param stateSerializer The {@link TypeSerializer} of the state we want to query.
* @return Future holding the result.
*/
@PublicEvolving
public <K, N, V> CompletableFuture<V> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final N namespace,
final TypeInformation<K> keyTypeInfo,
final TypeInformation<N> namespaceTypeInfo,
final TypeSerializer<V> stateSerializer) {


Preconditions.checkNotNull(jobId); Preconditions.checkNotNull(jobId);
Preconditions.checkNotNull(queryableStateName); Preconditions.checkNotNull(queryableStateName);
Expand All @@ -237,7 +208,7 @@ public <K, N, V> CompletableFuture<V> getKvState(


Preconditions.checkNotNull(keyTypeInfo); Preconditions.checkNotNull(keyTypeInfo);
Preconditions.checkNotNull(namespaceTypeInfo); Preconditions.checkNotNull(namespaceTypeInfo);
Preconditions.checkNotNull(stateSerializer); Preconditions.checkNotNull(stateDescriptor);


TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig); TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig);
TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig);
Expand All @@ -253,8 +224,8 @@ public <K, N, V> CompletableFuture<V> getKvState(
return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
stateResponse -> { stateResponse -> {
try { try {
return KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer); return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
} catch (IOException e) { } catch (Exception e) {
throw new FlinkRuntimeException(e); throw new FlinkRuntimeException(e);
} }
}); });
Expand Down
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.queryablestate.client.state;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

/**
* A read-only {@link AggregatingState} that <b>does not</b> allow for modifications.
*
* <p>This is the type of the result returned when querying Flink's keyed state using the
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link AggregatingStateDescriptor}.
*/
@PublicEvolving
public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {

private final OUT value;

private ImmutableAggregatingState(OUT value) {
this.value = Preconditions.checkNotNull(value);
}

@Override
public OUT get() {
return value;
}

@Override
public void add(Object newValue) {
throw MODIFICATION_ATTEMPT_ERROR;
}

@Override
public void clear() {
throw MODIFICATION_ATTEMPT_ERROR;
}

public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
final byte[] serializedValue) throws IOException {

final ACC accumulator = KvStateSerializer.deserializeValue(
serializedValue,
stateDescriptor.getSerializer());

final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
return new ImmutableAggregatingState<>(state);
}
}
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.queryablestate.client.state;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

/**
* A read-only {@link FoldingState} that does not allow for modifications.
*
* <p>This is the result returned when querying Flink's keyed state using the
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link FoldingStateDescriptor}.
*/
@PublicEvolving
@Deprecated
public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {

private final ACC value;

private ImmutableFoldingState(ACC value) {
this.value = Preconditions.checkNotNull(value);
}

@Override
public ACC get() {
return value;
}

@Override
public void add(Object newValue) {
throw MODIFICATION_ATTEMPT_ERROR;
}

@Override
public void clear() {
throw MODIFICATION_ATTEMPT_ERROR;
}

public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
final FoldingStateDescriptor<IN, ACC> stateDescriptor,
final byte[] serializedState) throws IOException {

final ACC state = KvStateSerializer.deserializeValue(
serializedState,
stateDescriptor.getSerializer());
return new ImmutableFoldingState<>(state);
}
}
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.queryablestate.client.state;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.List;

/**
* A read-only {@link ListState} that does not allow for modifications.
*
* <p>This is the result returned when querying Flink's keyed state using the
* {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
* providing an {@link ListStateDescriptor}.
*/
@PublicEvolving
public final class ImmutableListState<V> extends ImmutableState implements ListState<V> {

private final List<V> listState;

private ImmutableListState(final List<V> state) {
this.listState = Preconditions.checkNotNull(state);
}

@Override
public Iterable<V> get() {
return listState;
}

@Override
public void add(V value) {
throw MODIFICATION_ATTEMPT_ERROR;
}

@Override
public void clear() {
throw MODIFICATION_ATTEMPT_ERROR;
}

public static <V> ImmutableListState<V> createState(
final ListStateDescriptor<V> stateDescriptor,
final byte[] serializedState) throws IOException {

final List<V> state = KvStateSerializer.deserializeList(
serializedState,
stateDescriptor.getElementSerializer());
return new ImmutableListState<>(state);
}
}

0 comments on commit abc3e1c

Please sign in to comment.