-
Notifications
You must be signed in to change notification settings - Fork 13k
/
QueryableStateClient.java
257 lines (227 loc) · 10.4 KB
/
QueryableStateClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.queryablestate.client;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.queryablestate.FutureUtils;
import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
/**
* Client for querying Flink's managed state.
*
* <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}.
* The state instance created from this descriptor will be published for queries when it's
* created on the Task Managers and the location will be reported to the Job Manager.
*
* <p>The client connects to a {@code Client Proxy} running on a given Task Manager. The
* proxy is the entry point of the client to the Flink cluster. It forwards the requests
* of the client to the Job Manager and the required Task Manager, and forwards the final
* response back the client.
*
* <p>The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved
* locations are cached. When the server address of the requested KvState instance is determined, the
* client sends out a request to the server. The returned final answer is then forwarded to the Client.
*/
@PublicEvolving
public class QueryableStateClient {
private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
/** The client that forwards the requests to the proxy. */
private final Client<KvStateRequest, KvStateResponse> client;
/** The address of the proxy this client is connected to. */
private final InetSocketAddress remoteAddress;
/** The execution configuration used to instantiate the different (de-)serializers. */
private ExecutionConfig executionConfig;
/**
* Create the Queryable State Client.
* @param remoteHostname the hostname of the {@code Client Proxy} to connect to.
* @param remotePort the port of the proxy to connect to.
*/
public QueryableStateClient(final String remoteHostname, final int remotePort) throws UnknownHostException {
this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), remotePort);
}
/**
* Create the Queryable State Client.
* @param remoteAddress the {@link InetAddress address} of the {@code Client Proxy} to connect to.
* @param remotePort the port of the proxy to connect to.
*/
public QueryableStateClient(final InetAddress remoteAddress, final int remotePort) {
Preconditions.checkArgument(remotePort >= 0 && remotePort <= 65536,
"Remote Port " + remotePort + " is out of valid port range (0-65536).");
this.remoteAddress = new InetSocketAddress(remoteAddress, remotePort);
final MessageSerializer<KvStateRequest, KvStateResponse> messageSerializer =
new MessageSerializer<>(
new KvStateRequest.KvStateRequestDeserializer(),
new KvStateResponse.KvStateResponseDeserializer());
this.client = new Client<>(
"Queryable State Client",
1,
messageSerializer,
new DisabledKvStateRequestStats());
}
/** Shuts down the client. */
public void shutdown() {
client.shutdown();
}
/**
* Gets the {@link ExecutionConfig}.
*/
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}
/**
* Replaces the existing {@link ExecutionConfig} (possibly {@code null}), with the provided one.
* @param config The new {@code configuration}.
* @return The old configuration, or {@code null} if none was specified.
* */
public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
ExecutionConfig prev = executionConfig;
this.executionConfig = config;
return prev;
}
/**
* Returns a future holding the request result. *
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key we are interested in.
* @param keyTypeHint A {@link TypeHint} used to extract the type of the key.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the immutable {@link State} object containing the result.
*/
@PublicEvolving
public <K, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final TypeHint<K> keyTypeHint,
final StateDescriptor<S, V> stateDescriptor) {
Preconditions.checkNotNull(keyTypeHint);
TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
}
/**
* Returns a future holding the request result. *
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key we are interested in.
* @param keyTypeInfo The {@link TypeInformation} of the key.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the immutable {@link State} object containing the result.
*/
@PublicEvolving
public <K, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor) {
return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE,
keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
}
/**
* Returns a future holding the request result.
* @param jobId JobID of the job the queryable state belongs to.
* @param queryableStateName Name under which the state is queryable.
* @param key The key that the state we request is associated with.
* @param namespace The namespace of the state.
* @param keyTypeInfo The {@link TypeInformation} of the keys.
* @param namespaceTypeInfo The {@link TypeInformation} of the namespace.
* @param stateDescriptor The {@link StateDescriptor} of the state we want to query.
* @return Future holding the immutable {@link State} object containing the result.
*/
@PublicEvolving
public <K, N, S extends State, V> CompletableFuture<S> getKvState(
final JobID jobId,
final String queryableStateName,
final K key,
final N namespace,
final TypeInformation<K> keyTypeInfo,
final TypeInformation<N> namespaceTypeInfo,
final StateDescriptor<S, V> stateDescriptor) {
Preconditions.checkNotNull(jobId);
Preconditions.checkNotNull(queryableStateName);
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(keyTypeInfo);
Preconditions.checkNotNull(namespaceTypeInfo);
Preconditions.checkNotNull(stateDescriptor);
TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig);
TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig);
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
final byte[] serializedKeyAndNamespace;
try {
serializedKeyAndNamespace = KvStateSerializer
.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
} catch (IOException e) {
return FutureUtils.getFailedFuture(e);
}
return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
stateResponse -> {
try {
return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
});
}
/**
* Returns a future holding the serialized request result.
*
* @param jobId JobID of the job the queryable state
* belongs to
* @param queryableStateName Name under which the state is queryable
* @param keyHashCode Integer hash code of the key (result of
* a call to {@link Object#hashCode()}
* @param serializedKeyAndNamespace Serialized key and namespace to query
* KvState instance with
* @return Future holding the serialized result
*/
private CompletableFuture<KvStateResponse> getKvState(
final JobID jobId,
final String queryableStateName,
final int keyHashCode,
final byte[] serializedKeyAndNamespace) {
LOG.info("Sending State Request to {}.", remoteAddress);
try {
KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace);
return client.sendRequest(remoteAddress, request);
} catch (Exception e) {
LOG.error("Unable to send KVStateRequest: ", e);
return FutureUtils.getFailedFuture(e);
}
}
}