-
Notifications
You must be signed in to change notification settings - Fork 13k
/
StateTable.java
441 lines (375 loc) · 16.5 KB
/
StateTable.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.IterableStateSnapshot;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateEntry;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalKvState.StateIncrementalVisitor;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Base class for state tables. Accesses to state are typically scoped by the currently active key,
* as provided through the {@link InternalKeyContext}.
*
* @param <K> type of key
* @param <N> type of namespace
* @param <S> type of state
*/
public abstract class StateTable<K, N, S>
implements StateSnapshotRestore, Iterable<StateEntry<K, N, S>> {
/**
* The key context view on the backend. This provides information, such as the currently active
* key.
*/
protected final InternalKeyContext<K> keyContext;
/** Combined meta information such as name and serializers for this state. */
protected RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo;
/** The serializer of the key. */
protected final TypeSerializer<K> keySerializer;
/** The current key group range. */
protected final KeyGroupRange keyGroupRange;
/**
* Map for holding the actual state objects. The outer array represents the key-groups. All
* array positions will be initialized with an empty state map.
*/
protected final StateMap<K, N, S>[] keyGroupedStateMaps;
/**
* @param keyContext the key context provides the key scope for all put/get/delete operations.
* @param metaInfo the meta information, including the type serializer for state copy-on-write.
* @param keySerializer the serializer of the key.
*/
public StateTable(
InternalKeyContext<K> keyContext,
RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
TypeSerializer<K> keySerializer) {
this.keyContext = Preconditions.checkNotNull(keyContext);
this.metaInfo = Preconditions.checkNotNull(metaInfo);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.keyGroupRange = keyContext.getKeyGroupRange();
@SuppressWarnings("unchecked")
StateMap<K, N, S>[] state =
(StateMap<K, N, S>[])
new StateMap[keyContext.getKeyGroupRange().getNumberOfKeyGroups()];
this.keyGroupedStateMaps = state;
for (int i = 0; i < this.keyGroupedStateMaps.length; i++) {
this.keyGroupedStateMaps[i] = createStateMap();
}
}
protected abstract StateMap<K, N, S> createStateMap();
@Override
@Nonnull
public abstract IterableStateSnapshot<K, N, S> stateSnapshot();
// Main interface methods of StateTable -------------------------------------------------------
/**
* Returns whether this {@link StateTable} is empty.
*
* @return {@code true} if this {@link StateTable} has no elements, {@code false} otherwise.
* @see #size()
*/
public boolean isEmpty() {
return size() == 0;
}
/**
* Returns the total number of entries in this {@link StateTable}. This is the sum of both
* sub-tables.
*
* @return the number of entries in this {@link StateTable}.
*/
public int size() {
int count = 0;
for (StateMap<K, N, S> stateMap : keyGroupedStateMaps) {
count += stateMap.size();
}
return count;
}
/**
* Returns the state of the mapping for the composite of active key and given namespace.
*
* @param namespace the namespace. Not null.
* @return the states of the mapping with the specified key/namespace composite key, or {@code
* null} if no mapping for the specified key is found.
*/
public S get(N namespace) {
return get(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
}
/**
* Returns whether this table contains a mapping for the composite of active key and given
* namespace.
*
* @param namespace the namespace in the composite key to search for. Not null.
* @return {@code true} if this map contains the specified key/namespace composite key, {@code
* false} otherwise.
*/
public boolean containsKey(N namespace) {
return containsKey(
keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
}
/**
* Maps the composite of active key and given namespace to the specified state.
*
* @param namespace the namespace. Not null.
* @param state the state. Can be null.
*/
public void put(N namespace, S state) {
put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);
}
/**
* Removes the mapping for the composite of active key and given namespace. This method should
* be preferred over {@link #removeAndGetOld(N)} when the caller is not interested in the old
* state.
*
* @param namespace the namespace of the mapping to remove. Not null.
*/
public void remove(N namespace) {
remove(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
}
/**
* Removes the mapping for the composite of active key and given namespace, returning the state
* that was found under the entry.
*
* @param namespace the namespace of the mapping to remove. Not null.
* @return the state of the removed mapping or {@code null} if no mapping for the specified key
* was found.
*/
public S removeAndGetOld(N namespace) {
return removeAndGetOld(
keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);
}
/**
* Applies the given {@link StateTransformationFunction} to the state (1st input argument),
* using the given value as second input argument. The result of {@link
* StateTransformationFunction#apply(Object, Object)} is then stored as the new state. This
* function is basically an optimization for get-update-put pattern.
*
* @param namespace the namespace. Not null.
* @param value the value to use in transforming the state. Can be null.
* @param transformation the transformation function.
* @throws Exception if some exception happens in the transformation function.
*/
public <T> void transform(
N namespace, T value, StateTransformationFunction<S, T> transformation)
throws Exception {
K key = keyContext.getCurrentKey();
checkKeyNamespacePreconditions(key, namespace);
int keyGroup = keyContext.getCurrentKeyGroupIndex();
getMapForKeyGroup(keyGroup).transform(key, namespace, value, transformation);
}
// For queryable state ------------------------------------------------------------------------
/**
* Returns the state for the composite of active key and given namespace. This is typically used
* by queryable state.
*
* @param key the key. Not null.
* @param namespace the namespace. Not null.
* @return the state of the mapping with the specified key/namespace composite key, or {@code
* null} if no mapping for the specified key is found.
*/
public S get(K key, N namespace) {
int keyGroup =
KeyGroupRangeAssignment.assignToKeyGroup(key, keyContext.getNumberOfKeyGroups());
return get(key, keyGroup, namespace);
}
public Stream<K> getKeys(N namespace) {
return Arrays.stream(keyGroupedStateMaps)
.flatMap(
stateMap ->
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0),
false))
.filter(entry -> entry.getNamespace().equals(namespace))
.map(StateEntry::getKey);
}
public Stream<Tuple2<K, N>> getKeysAndNamespaces() {
return Arrays.stream(keyGroupedStateMaps)
.flatMap(
stateMap ->
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0),
false))
.map(entry -> Tuple2.of(entry.getKey(), entry.getNamespace()));
}
public StateIncrementalVisitor<K, N, S> getStateIncrementalVisitor(
int recommendedMaxNumberOfReturnedRecords) {
return new StateEntryIterator(recommendedMaxNumberOfReturnedRecords);
}
// ------------------------------------------------------------------------
private S get(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
return getMapForKeyGroup(keyGroupIndex).get(key, namespace);
}
private boolean containsKey(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
return getMapForKeyGroup(keyGroupIndex).containsKey(key, namespace);
}
private void checkKeyNamespacePreconditions(K key, N namespace) {
Preconditions.checkNotNull(
key, "No key set. This method should not be called outside of a keyed context.");
Preconditions.checkNotNull(namespace, "Provided namespace is null.");
}
private void remove(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
getMapForKeyGroup(keyGroupIndex).remove(key, namespace);
}
private S removeAndGetOld(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
return getMapForKeyGroup(keyGroupIndex).removeAndGetOld(key, namespace);
}
// ------------------------------------------------------------------------
// access to maps
// ------------------------------------------------------------------------
/** Returns the internal data structure. */
@VisibleForTesting
public StateMap<K, N, S>[] getState() {
return keyGroupedStateMaps;
}
public int getKeyGroupOffset() {
return keyGroupRange.getStartKeyGroup();
}
@VisibleForTesting
public StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) {
final int pos = indexToOffset(keyGroupIndex);
if (pos >= 0 && pos < keyGroupedStateMaps.length) {
return keyGroupedStateMaps[pos];
} else {
throw KeyGroupRangeOffsets.newIllegalKeyGroupException(keyGroupIndex, keyGroupRange);
}
}
/** Translates a key-group id to the internal array offset. */
private int indexToOffset(int index) {
return index - getKeyGroupOffset();
}
// Meta data setter / getter and toString -----------------------------------------------------
public TypeSerializer<K> getKeySerializer() {
return keySerializer;
}
public TypeSerializer<S> getStateSerializer() {
return metaInfo.getStateSerializer();
}
public TypeSerializer<N> getNamespaceSerializer() {
return metaInfo.getNamespaceSerializer();
}
public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
return metaInfo;
}
public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
this.metaInfo = metaInfo;
}
// Snapshot / Restore -------------------------------------------------------------------------
public void put(K key, int keyGroup, N namespace, S state) {
checkKeyNamespacePreconditions(key, namespace);
getMapForKeyGroup(keyGroup).put(key, namespace, state);
}
@Override
public Iterator<StateEntry<K, N, S>> iterator() {
return Arrays.stream(keyGroupedStateMaps)
.filter(Objects::nonNull)
.flatMap(
stateMap ->
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(stateMap.iterator(), 0),
false))
.iterator();
}
// For testing --------------------------------------------------------------------------------
@VisibleForTesting
public int sizeOfNamespace(Object namespace) {
int count = 0;
for (StateMap<K, N, S> stateMap : keyGroupedStateMaps) {
count += stateMap.sizeOfNamespace(namespace);
}
return count;
}
@Nonnull
@Override
public StateSnapshotKeyGroupReader keyGroupReader(int readVersion) {
return StateTableByKeyGroupReaders.readerForVersion(this, readVersion);
}
// StateEntryIterator
// ---------------------------------------------------------------------------------------------
class StateEntryIterator implements StateIncrementalVisitor<K, N, S> {
final int recommendedMaxNumberOfReturnedRecords;
int keyGroupIndex;
StateIncrementalVisitor<K, N, S> stateIncrementalVisitor;
StateEntryIterator(int recommendedMaxNumberOfReturnedRecords) {
this.recommendedMaxNumberOfReturnedRecords = recommendedMaxNumberOfReturnedRecords;
this.keyGroupIndex = 0;
next();
}
private void next() {
while (keyGroupIndex < keyGroupedStateMaps.length) {
StateMap<K, N, S> stateMap = keyGroupedStateMaps[keyGroupIndex++];
StateIncrementalVisitor<K, N, S> visitor =
stateMap.getStateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
if (visitor.hasNext()) {
stateIncrementalVisitor = visitor;
return;
}
}
}
@Override
public boolean hasNext() {
while (stateIncrementalVisitor == null || !stateIncrementalVisitor.hasNext()) {
if (keyGroupIndex == keyGroupedStateMaps.length) {
return false;
}
StateIncrementalVisitor<K, N, S> visitor =
keyGroupedStateMaps[keyGroupIndex++].getStateIncrementalVisitor(
recommendedMaxNumberOfReturnedRecords);
if (visitor.hasNext()) {
stateIncrementalVisitor = visitor;
break;
}
}
return true;
}
@Override
public Collection<StateEntry<K, N, S>> nextEntries() {
if (!hasNext()) {
return null;
}
return stateIncrementalVisitor.nextEntries();
}
@Override
public void remove(StateEntry<K, N, S> stateEntry) {
keyGroupedStateMaps[keyGroupIndex - 1].remove(
stateEntry.getKey(), stateEntry.getNamespace());
}
@Override
public void update(StateEntry<K, N, S> stateEntry, S newValue) {
keyGroupedStateMaps[keyGroupIndex - 1].put(
stateEntry.getKey(), stateEntry.getNamespace(), newValue);
}
}
}