Skip to content

Commit

Permalink
[FLINK-7044] [qs] Allow to specify namespace and descriptor in query.
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u authored and aljoscha committed Jul 6, 2017
1 parent 6eae45e commit 6c05abe
Show file tree
Hide file tree
Showing 5 changed files with 550 additions and 117 deletions.
Expand Up @@ -22,8 +22,14 @@
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand All @@ -38,7 +44,10 @@
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,6 +59,7 @@
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -95,6 +105,8 @@ public class QueryableStateClient {
/** This is != null, if we started the actor system. */
private final ActorSystem actorSystem;

private ExecutionConfig executionConfig;

/**
* Creates a client from the given configuration.
*
Expand Down Expand Up @@ -157,10 +169,21 @@ public QueryableStateClient(
this.lookupService = lookupService;
this.kvStateClient = networkClient;
this.executionContext = actorSystem.dispatcher();
this.executionConfig = new ExecutionConfig();

this.lookupService.start();
}

/** Gets the {@link ExecutionConfig}. */
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}

/** Sets the {@link ExecutionConfig}. */
public void setExecutionConfig(ExecutionConfig config) {
this.executionConfig = config;
}

/**
* Creates a client.
*
Expand Down Expand Up @@ -266,6 +289,181 @@ public Future<byte[]> recover(Throwable failure) throws Throwable {
}, executionContext);
}

/**
* Returns a future holding the request result.
*
* <p>If the server does not serve a KvState instance with the given ID,
* the Future will be failed with a {@link UnknownKvStateID}.
*
* <p>If the KvState instance does not hold any data for the given key
* and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
*
* <p>All other failures are forwarded to the Future.
*
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key we are interested in.
* @param keyTypeHint A {@link TypeHint} used to extract the type of the key.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the result.
*/
@PublicEvolving
public <K, V> Future<V> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final TypeHint<K> keyTypeHint,
final StateDescriptor<?, V> stateDescriptor) {

Preconditions.checkNotNull(keyTypeHint);

TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
}

/**
* Returns a future holding the request result.
*
* <p>If the server does not serve a KvState instance with the given ID,
* the Future will be failed with a {@link UnknownKvStateID}.
*
* <p>If the KvState instance does not hold any data for the given key
* and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
*
* <p>All other failures are forwarded to the Future.
*
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key we are interested in.
* @param keyTypeInfo The {@link TypeInformation} of the key.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the result.
*/
@PublicEvolving
public <K, V> Future<V> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<?, V> stateDescriptor) {

Preconditions.checkNotNull(keyTypeInfo);

return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE,
keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
}

/**
* Returns a future holding the request result.
*
* <p>If the server does not serve a KvState instance with the given ID,
* the Future will be failed with a {@link UnknownKvStateID}.
*
* <p>If the KvState instance does not hold any data for the given key
* and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
*
* <p>All other failures are forwarded to the Future.
*
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key that the state we request is associated with.
* @param namespace The namespace of the state.
* @param keyTypeInfo The {@link TypeInformation} of the keys.
* @param namespaceTypeInfo The {@link TypeInformation} of the namespace.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the result.
*/
@PublicEvolving
public <K, V, N> Future<V> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final N namespace,
final TypeInformation<K> keyTypeInfo,
final TypeInformation<N> namespaceTypeInfo,
final StateDescriptor<?, V> stateDescriptor) {

Preconditions.checkNotNull(stateDescriptor);

// initialize the value serializer based on the execution config.
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
TypeSerializer<V> stateSerializer = stateDescriptor.getSerializer();

return getKvState(jobId, queryableStateName, key,
namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer);
}

/**
* Returns a future holding the request result.
*
* <p>If the server does not serve a KvState instance with the given ID,
* the Future will be failed with a {@link UnknownKvStateID}.
*
* <p>If the KvState instance does not hold any data for the given key
* and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
*
* <p>All other failures are forwarded to the Future.
*
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key that the state we request is associated with.
* @param namespace The namespace of the state.
* @param keyTypeInfo The {@link TypeInformation} of the keys.
* @param namespaceTypeInfo The {@link TypeInformation} of the namespace.
* @param stateSerializer The {@link TypeSerializer} of the state we want to query.
* @return Future holding the result.
*/
@PublicEvolving
public <K, V, N> Future<V> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final N namespace,
final TypeInformation<K> keyTypeInfo,
final TypeInformation<N> namespaceTypeInfo,
final TypeSerializer<V> stateSerializer) {

Preconditions.checkNotNull(queryableStateName);

Preconditions.checkNotNull(key);
Preconditions.checkNotNull(namespace);

Preconditions.checkNotNull(keyTypeInfo);
Preconditions.checkNotNull(namespaceTypeInfo);
Preconditions.checkNotNull(stateSerializer);

if (stateSerializer instanceof ListSerializer) {
throw new IllegalArgumentException("ListState is not supported out-of-the-box yet.");
}

TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig);
TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig);

final byte[] serializedKeyAndNamespace;
try {
serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace(
key,
keySerializer,
namespace,
namespaceSerializer);
} catch (IOException e) {
return Futures.failed(e);
}

return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace)
.flatMap(new Mapper<byte[], Future<V>>() {
@Override
public Future<V> apply(byte[] parameter) {
try {
return Futures.successful(
KvStateRequestSerializer.deserializeValue(parameter, stateSerializer));
} catch (IOException e) {
return Futures.failed(e);
}
}
}, executionContext);
}

/**
* Returns a future holding the serialized request result.
*
Expand Down
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

/**
* {@link TypeInformation} for {@link VoidNamespace}.
*/
@Public
public class VoidNamespaceTypeInfo extends TypeInformation<VoidNamespace> {

private static final long serialVersionUID = 5453679706408610586L;

public static final VoidNamespaceTypeInfo INSTANCE = new VoidNamespaceTypeInfo();

@Override
@PublicEvolving
public boolean isBasicType() {
return false;
}

@Override
@PublicEvolving
public boolean isTupleType() {
return false;
}

@Override
@PublicEvolving
public int getArity() {
return 0;
}

@Override
@PublicEvolving
public int getTotalFields() {
return 0;
}

@Override
@PublicEvolving
public Class<VoidNamespace> getTypeClass() {
return VoidNamespace.class;
}

@Override
@PublicEvolving
public boolean isKeyType() {
return false;
}

@Override
@PublicEvolving
public TypeSerializer<VoidNamespace> createSerializer(ExecutionConfig config) {
return VoidNamespaceSerializer.INSTANCE;
}

@Override
@PublicEvolving
public String toString() {
return "VoidNamespaceTypeInfo";
}

@Override
@PublicEvolving
public boolean equals(Object obj) {
return this == obj || obj instanceof VoidNamespaceTypeInfo;
}

@Override
@PublicEvolving
public int hashCode() {
return 0;
}

@Override
@PublicEvolving
public boolean canEqual(Object obj) {
return obj instanceof VoidNamespaceTypeInfo;
}
}

0 comments on commit 6c05abe

Please sign in to comment.