Skip to content

Commit

Permalink
[FLINK-7770][QS] Hide the queryable state behind a proxy.
Browse files Browse the repository at this point in the history
Previously the QueryableStateClient could connect to the JM
and the TMs directly to fetch the required state. Now, there
is a proxy running on each TM and the remote client connects
to one of these proxies in order to get its state. The proxy
receives the request from the client, performs all necessary
message exchanges within the Flink cluster, receives the state
and forwards it back to the client.

This architecture allows for more security features to be
integrated in the future, as the proxy is running in the
cluster, it exposes less information about the cluster to
the outside world, and is more suitable for containerized
environments.
  • Loading branch information
kl0u committed Oct 11, 2017
1 parent 29a6e99 commit f48f534
Show file tree
Hide file tree
Showing 52 changed files with 2,753 additions and 3,179 deletions.
Expand Up @@ -40,7 +40,7 @@ public class QueryableStateOptions {
/** Port to bind KvState server to (0 => pick random available port). */
public static final ConfigOption<Integer> SERVER_PORT =
key("query.server.port")
.defaultValue(0);
.defaultValue(9069);

/** Number of network (event loop) threads for the KvState server (0 => #slots). */
public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =
Expand Down
Expand Up @@ -150,7 +150,7 @@ public int hashCode() {
((int) this.upperPart) ^
((int) (this.upperPart >>> 32));
}

@Override
public String toString() {
if (this.toString == null) {
Expand All @@ -163,7 +163,7 @@ public String toString() {

return this.toString;
}

@Override
public int compareTo(AbstractID o) {
int diff1 = (this.upperPart < o.upperPart) ? -1 : ((this.upperPart == o.upperPart) ? 0 : 1);
Expand Down
Expand Up @@ -18,18 +18,19 @@

package org.apache.flink.queryablestate;

import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
import org.apache.flink.annotation.Internal;

/**
* Exception to fail Future with if no JobManager is currently registered at
* the {@link KvStateLocationLookupService}.
* Exception to fail Future if the Task Manager on which the
* {@link org.apache.flink.runtime.query.KvStateClientProxy}
* is running on, does not know the active Job Manager.
*/
public class UnknownJobManager extends Exception {
@Internal
public class UnknownJobManagerException extends Exception {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 9092442511708951209L;

public UnknownJobManager() {
super("Unknown JobManager. Either the JobManager has not registered yet " +
"or has lost leadership.");
public UnknownJobManagerException() {
super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership.");
}
}
Expand Up @@ -18,14 +18,22 @@

package org.apache.flink.queryablestate;

import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.BadRequestException;

/**
* Thrown if the KvState does not hold any state for the given key or namespace.
*/
public class UnknownKeyOrNamespace extends IllegalStateException {
@Internal
public class UnknownKeyOrNamespaceException extends BadRequestException {

private static final long serialVersionUID = 1L;

public UnknownKeyOrNamespace() {
super("KvState does not hold any state for key/namespace.");
/**
* Creates the exception.
* @param serverName the name of the server that threw the exception.
*/
public UnknownKeyOrNamespaceException(String serverName) {
super(serverName, "No state for the specified key/namespace.");
}
}
Expand Up @@ -18,18 +18,25 @@

package org.apache.flink.queryablestate;

import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.BadRequestException;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.util.Preconditions;

/**
* Thrown if no KvState with the given ID cannot found by the server handler.
*/
public class UnknownKvStateID extends IllegalStateException {
@Internal
public class UnknownKvStateIdException extends BadRequestException {

private static final long serialVersionUID = 1L;

public UnknownKvStateID(KvStateID kvStateId) {
super("No KvState registered with ID " + Preconditions.checkNotNull(kvStateId, "KvStateID") +
" at TaskManager.");
/**
* Creates the exception.
* @param serverName the name of the server that threw the exception.
* @param kvStateId the state id for which no state was found.
*/
public UnknownKvStateIdException(String serverName, KvStateID kvStateId) {
super(serverName, "No registered state with ID " + Preconditions.checkNotNull(kvStateId) + '.');
}
}
Expand Up @@ -18,14 +18,24 @@

package org.apache.flink.queryablestate;

import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.network.BadRequestException;
import org.apache.flink.runtime.query.KvStateLocation;

/**
* Exception thrown if there is no location information available for the given
* key group in a {@link KvStateLocation} instance.
*/
public class UnknownKvStateKeyGroupLocation extends Exception {
@Internal
public class UnknownKvStateKeyGroupLocationException extends BadRequestException {

private static final long serialVersionUID = 1L;

/**
* Creates the exception.
* @param serverName the name of the server that threw the exception.
*/
public UnknownKvStateKeyGroupLocationException(String serverName) {
super(serverName, "Unknown key-group location.");
}
}

0 comments on commit f48f534

Please sign in to comment.