From f0e9c27bf807966a742bd22bf02dc600950f28ce Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Thu, 31 Mar 2022 16:14:19 +0200 Subject: [PATCH] [FLINK-21649][state/heap] Allow extension of CopyOnWriteState classes --- .../state/heap/CopyOnWriteStateMap.java | 27 ++++++++++++++----- .../heap/CopyOnWriteStateMapSnapshot.java | 12 ++++----- .../state/heap/CopyOnWriteStateTable.java | 2 +- .../heap/CopyOnWriteStateTableSnapshot.java | 4 +-- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java index e33a3cf04980c..e8f9dbc131d34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java @@ -203,7 +203,7 @@ public class CopyOnWriteStateMap extends StateMap { * * @param stateSerializer the serializer of the key. */ - CopyOnWriteStateMap(TypeSerializer stateSerializer) { + protected CopyOnWriteStateMap(TypeSerializer stateSerializer) { this(DEFAULT_CAPACITY, stateSerializer); } @@ -382,7 +382,7 @@ public void transform( // --------------------------------------------------------------- /** Helper method that is the basis for operations that add mappings. */ - private StateMapEntry putEntry(K key, N namespace) { + protected StateMapEntry putEntry(K key, N namespace) { final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); final StateMapEntry[] tab = selectActiveTable(hash); @@ -409,7 +409,7 @@ private StateMapEntry putEntry(K key, N namespace) { } /** Helper method that is the basis for operations that remove mappings. */ - private StateMapEntry removeEntry(K key, N namespace) { + protected StateMapEntry removeEntry(K key, N namespace) { final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); final StateMapEntry[] tab = selectActiveTable(hash); @@ -491,6 +491,7 @@ StateMapEntry[] snapshotMapArrays() { highestRequiredSnapshotVersion = stateMapVersion; snapshotVersions.add(highestRequiredSnapshotVersion); } + onVersionUpdate(stateMapVersion); StateMapEntry[] table = primaryTable; @@ -533,7 +534,7 @@ StateMapEntry[] snapshotMapArrays() { return copy; } - int getStateMapVersion() { + public int getStateMapVersion() { return stateMapVersion; } @@ -808,7 +809,7 @@ public TypeSerializer getStateSerializer() { * @param type of state. */ @VisibleForTesting - protected static class StateMapEntry implements StateEntry { + public static class StateMapEntry implements StateEntry { /** The key. Assumed to be immumap and not null. */ @Nonnull final K key; @@ -843,7 +844,7 @@ protected static class StateMapEntry implements StateEntry { /** The computed secondary hash for the composite of key and namespace. */ final int hash; - StateMapEntry(StateMapEntry other, int entryVersion) { + public StateMapEntry(StateMapEntry other, int entryVersion) { this( other.key, other.namespace, @@ -919,6 +920,18 @@ public final int hashCode() { public final String toString() { return "(" + key + "|" + namespace + ")=" + state; } + + public int getStateVersion() { + return stateVersion; + } + + public int getEntryVersion() { + return entryVersion; + } + + public StateMapEntry next() { + return next; + } } // For testing @@ -1086,4 +1099,6 @@ public void update(StateEntry stateEntry, S newValue) { CopyOnWriteStateMap.this.put(stateEntry.getKey(), stateEntry.getNamespace(), newValue); } } + + protected void onVersionUpdate(int stateMapVersion) {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot.java index d6c06f0f0d3ab..0a58a06455881 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMapSnapshot.java @@ -58,7 +58,7 @@ public class CopyOnWriteStateMapSnapshot * Version of the {@link CopyOnWriteStateMap} when this snapshot was created. This can be used * to release the snapshot. */ - private final int snapshotVersion; + protected final int snapshotVersion; /** * The state map entries, as by the time this snapshot was created. Objects in this array may or @@ -66,7 +66,7 @@ public class CopyOnWriteStateMapSnapshot * this snapshot. This depends for each entry on whether or not it was subject to copy-on-write * operations by the {@link CopyOnWriteStateMap}. */ - @Nonnull private final CopyOnWriteStateMap.StateMapEntry[] snapshotData; + @Nonnull protected final CopyOnWriteStateMap.StateMapEntry[] snapshotData; /** The number of (non-null) entries in snapshotData. */ @Nonnegative private final int numberOfEntriesInSnapshotData; @@ -80,7 +80,7 @@ public class CopyOnWriteStateMapSnapshot * @param owningStateMap the {@link CopyOnWriteStateMap} for which this object represents a * snapshot. */ - CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap owningStateMap) { + protected CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap owningStateMap) { super(owningStateMap); this.snapshotData = owningStateMap.snapshotMapArrays(); @@ -111,7 +111,7 @@ int getSnapshotVersion() { } @Override - public SnapshotIterator getIterator( + public Iterator> getIterator( @Nonnull TypeSerializer keySerializer, @Nonnull TypeSerializer namespaceSerializer, @Nonnull TypeSerializer stateSerializer, @@ -131,14 +131,14 @@ public void writeState( @Nonnull DataOutputView dov, @Nullable StateSnapshotTransformer stateSnapshotTransformer) throws IOException { - SnapshotIterator snapshotIterator = + Iterator> snapshotIterator = getIterator( keySerializer, namespaceSerializer, stateSerializer, stateSnapshotTransformer); - int size = snapshotIterator.size(); + int size = ((SnapshotIterator) snapshotIterator).size(); dov.writeInt(size); while (snapshotIterator.hasNext()) { StateEntry stateEntry = snapshotIterator.next(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java index 3e23098094e1a..11c342d8d7611 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java @@ -43,7 +43,7 @@ public class CopyOnWriteStateTable extends StateTable { * @param metaInfo the meta information, including the type serializer for state copy-on-write. * @param keySerializer the serializer of the key. */ - CopyOnWriteStateTable( + protected CopyOnWriteStateTable( InternalKeyContext keyContext, RegisteredKeyValueStateBackendMetaInfo metaInfo, TypeSerializer keySerializer) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java index 51548014d9354..753bc99395837 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java @@ -42,7 +42,7 @@ public class CopyOnWriteStateTableSnapshot extends AbstractStateTableSn private final int keyGroupOffset; /** Snapshots of state partitioned by key-group. */ - @Nonnull private final List> stateMapSnapshots; + @Nonnull protected final List> stateMapSnapshots; /** * Creates a new {@link CopyOnWriteStateTableSnapshot}. @@ -50,7 +50,7 @@ public class CopyOnWriteStateTableSnapshot extends AbstractStateTableSn * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a * snapshot. */ - CopyOnWriteStateTableSnapshot( + protected CopyOnWriteStateTableSnapshot( CopyOnWriteStateTable owningStateTable, TypeSerializer localKeySerializer, TypeSerializer localNamespaceSerializer,