Skip to content

Commit

Permalink
[FLINK-21649][state/heap] Allow extension of CopyOnWriteState classes
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Mar 31, 2022
1 parent 33e7c84 commit f0e9c27
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 15 deletions.
Expand Up @@ -203,7 +203,7 @@ public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
*
* @param stateSerializer the serializer of the key.
*/
CopyOnWriteStateMap(TypeSerializer<S> stateSerializer) {
protected CopyOnWriteStateMap(TypeSerializer<S> stateSerializer) {
this(DEFAULT_CAPACITY, stateSerializer);
}

Expand Down Expand Up @@ -382,7 +382,7 @@ public <T> void transform(
// ---------------------------------------------------------------

/** Helper method that is the basis for operations that add mappings. */
private StateMapEntry<K, N, S> putEntry(K key, N namespace) {
protected StateMapEntry<K, N, S> putEntry(K key, N namespace) {

final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
Expand All @@ -409,7 +409,7 @@ private StateMapEntry<K, N, S> putEntry(K key, N namespace) {
}

/** Helper method that is the basis for operations that remove mappings. */
private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {
protected StateMapEntry<K, N, S> removeEntry(K key, N namespace) {

final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
Expand Down Expand Up @@ -491,6 +491,7 @@ StateMapEntry<K, N, S>[] snapshotMapArrays() {
highestRequiredSnapshotVersion = stateMapVersion;
snapshotVersions.add(highestRequiredSnapshotVersion);
}
onVersionUpdate(stateMapVersion);

StateMapEntry<K, N, S>[] table = primaryTable;

Expand Down Expand Up @@ -533,7 +534,7 @@ StateMapEntry<K, N, S>[] snapshotMapArrays() {
return copy;
}

int getStateMapVersion() {
public int getStateMapVersion() {
return stateMapVersion;
}

Expand Down Expand Up @@ -808,7 +809,7 @@ public TypeSerializer<S> getStateSerializer() {
* @param <S> type of state.
*/
@VisibleForTesting
protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {
public static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {

/** The key. Assumed to be immumap and not null. */
@Nonnull final K key;
Expand Down Expand Up @@ -843,7 +844,7 @@ protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {
/** The computed secondary hash for the composite of key and namespace. */
final int hash;

StateMapEntry(StateMapEntry<K, N, S> other, int entryVersion) {
public StateMapEntry(StateMapEntry<K, N, S> other, int entryVersion) {
this(
other.key,
other.namespace,
Expand Down Expand Up @@ -919,6 +920,18 @@ public final int hashCode() {
public final String toString() {
return "(" + key + "|" + namespace + ")=" + state;
}

public int getStateVersion() {
return stateVersion;
}

public int getEntryVersion() {
return entryVersion;
}

public StateMapEntry<K, N, S> next() {
return next;
}
}

// For testing
Expand Down Expand Up @@ -1086,4 +1099,6 @@ public void update(StateEntry<K, N, S> stateEntry, S newValue) {
CopyOnWriteStateMap.this.put(stateEntry.getKey(), stateEntry.getNamespace(), newValue);
}
}

protected void onVersionUpdate(int stateMapVersion) {}
}
Expand Up @@ -58,15 +58,15 @@ public class CopyOnWriteStateMapSnapshot<K, N, S>
* Version of the {@link CopyOnWriteStateMap} when this snapshot was created. This can be used
* to release the snapshot.
*/
private final int snapshotVersion;
protected final int snapshotVersion;

/**
* The state map entries, as by the time this snapshot was created. Objects in this array may or
* may not be deep copies of the current entries in the {@link CopyOnWriteStateMap} that created
* this snapshot. This depends for each entry on whether or not it was subject to copy-on-write
* operations by the {@link CopyOnWriteStateMap}.
*/
@Nonnull private final CopyOnWriteStateMap.StateMapEntry<K, N, S>[] snapshotData;
@Nonnull protected final CopyOnWriteStateMap.StateMapEntry<K, N, S>[] snapshotData;

/** The number of (non-null) entries in snapshotData. */
@Nonnegative private final int numberOfEntriesInSnapshotData;
Expand All @@ -80,7 +80,7 @@ public class CopyOnWriteStateMapSnapshot<K, N, S>
* @param owningStateMap the {@link CopyOnWriteStateMap} for which this object represents a
* snapshot.
*/
CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) {
protected CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) {
super(owningStateMap);

this.snapshotData = owningStateMap.snapshotMapArrays();
Expand Down Expand Up @@ -111,7 +111,7 @@ int getSnapshotVersion() {
}

@Override
public SnapshotIterator<K, N, S> getIterator(
public Iterator<StateEntry<K, N, S>> getIterator(
@Nonnull TypeSerializer<K> keySerializer,
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull TypeSerializer<S> stateSerializer,
Expand All @@ -131,14 +131,14 @@ public void writeState(
@Nonnull DataOutputView dov,
@Nullable StateSnapshotTransformer<S> stateSnapshotTransformer)
throws IOException {
SnapshotIterator<K, N, S> snapshotIterator =
Iterator<StateEntry<K, N, S>> snapshotIterator =
getIterator(
keySerializer,
namespaceSerializer,
stateSerializer,
stateSnapshotTransformer);

int size = snapshotIterator.size();
int size = ((SnapshotIterator<K, N, S>) snapshotIterator).size();
dov.writeInt(size);
while (snapshotIterator.hasNext()) {
StateEntry<K, N, S> stateEntry = snapshotIterator.next();
Expand Down
Expand Up @@ -43,7 +43,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> {
* @param metaInfo the meta information, including the type serializer for state copy-on-write.
* @param keySerializer the serializer of the key.
*/
CopyOnWriteStateTable(
protected CopyOnWriteStateTable(
InternalKeyContext<K> keyContext,
RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
TypeSerializer<K> keySerializer) {
Expand Down
Expand Up @@ -42,15 +42,15 @@ public class CopyOnWriteStateTableSnapshot<K, N, S> extends AbstractStateTableSn
private final int keyGroupOffset;

/** Snapshots of state partitioned by key-group. */
@Nonnull private final List<CopyOnWriteStateMapSnapshot<K, N, S>> stateMapSnapshots;
@Nonnull protected final List<CopyOnWriteStateMapSnapshot<K, N, S>> stateMapSnapshots;

/**
* Creates a new {@link CopyOnWriteStateTableSnapshot}.
*
* @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a
* snapshot.
*/
CopyOnWriteStateTableSnapshot(
protected CopyOnWriteStateTableSnapshot(
CopyOnWriteStateTable<K, N, S> owningStateTable,
TypeSerializer<K> localKeySerializer,
TypeSerializer<N> localNamespaceSerializer,
Expand Down

0 comments on commit f0e9c27

Please sign in to comment.