From 58e5bce6a4adf99a10b2256ff624a7da23e27a80 Mon Sep 17 00:00:00 2001 From: "xiaogang.sxg" Date: Thu, 26 Jan 2017 19:44:32 +0100 Subject: [PATCH] [FLINK-5024] [core] Refactor the interface of State and StateDescriptor --- .../streaming/state/AbstractRocksDBState.java | 3 +- .../state/RocksDBAggregatingState.java | 2 +- .../streaming/state/RocksDBFoldingState.java | 4 +- .../state/RocksDBKeyedStateBackend.java | 64 ++-- .../streaming/state/RocksDBListState.java | 2 +- .../streaming/state/RocksDBReducingState.java | 2 +- .../streaming/state/RocksDBValueState.java | 2 +- .../streaming/state/RocksDBStateBackend.java | 2 +- .../state/AggregatingStateDescriptor.java | 22 +- .../common/state/FoldingStateDescriptor.java | 139 ++++++-- .../api/common/state/ListStateDescriptor.java | 48 +-- .../common/state/ReducingStateDescriptor.java | 55 +-- .../common/state/SimpleStateDescriptor.java | 158 +++++++++ .../apache/flink/api/common/state/State.java | 13 +- .../api/common/state/StateDescriptor.java | 234 ++---------- .../common/state/ValueStateDescriptor.java | 134 ++++++- .../common/state/FoldingStateDescriptor.java | 144 ++++++++ .../api/common/state/ListStateDescriptor.java | 107 ++++++ .../common/state/ReducingStateDescriptor.java | 132 +++++++ .../api/common/state/StateBackend.java | 66 ++++ .../api/common/state/StateDescriptor.java | 332 ++++++++++++++++++ .../common/state/ValueStateDescriptor.java | 117 ++++++ .../runtime/state/KvStateSnapshot.java | 2 +- .../filesystem/AbstractFsStateSnapshot.java | 2 +- .../state/filesystem/FsFoldingState.java | 2 +- .../runtime/state/filesystem/FsListState.java | 2 +- .../state/filesystem/FsReducingState.java | 2 +- .../state/filesystem/FsValueState.java | 2 +- .../memory/AbstractMemStateSnapshot.java | 2 +- .../runtime/state/memory/MemFoldingState.java | 2 +- .../runtime/state/memory/MemListState.java | 2 +- .../state/memory/MemReducingState.java | 2 +- .../runtime/state/memory/MemValueState.java | 2 +- .../state/AbstractKeyedStateBackend.java | 4 +- .../runtime/state/DefaultKeyedStateStore.java | 2 +- .../runtime/state/KeyedStateBackend.java | 4 +- .../state/heap/AbstractHeapMergingState.java | 4 +- .../runtime/state/heap/AbstractHeapState.java | 3 +- .../runtime/state/heap/HeapFoldingState.java | 2 +- .../state/heap/HeapKeyedStateBackend.java | 6 +- .../savepoint/MigrationV0ToV1Test.java | 3 +- .../query/AbstractQueryableStateOperator.java | 4 +- .../QueryableAppendingStateOperator.java | 2 +- .../query/QueryableValueStateOperator.java | 2 +- .../api/operators/AbstractStreamOperator.java | 6 +- .../operators/StreamingRuntimeContext.java | 2 +- .../api/windowing/triggers/Trigger.java | 4 +- .../windowing/EvictingWindowOperator.java | 8 +- .../operators/windowing/WindowOperator.java | 27 +- .../operators/StateDescriptorPassingTest.java | 5 +- .../StreamingRuntimeContextTest.java | 46 +-- .../windowing/TriggerTestHarness.java | 4 +- .../windowing/WindowOperatorContractTest.java | 2 +- 53 files changed, 1549 insertions(+), 395 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/state/SimpleStateDescriptor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/FoldingStateDescriptor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ReducingStateDescriptor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateBackend.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateDescriptor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ValueStateDescriptor.java diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 89f41aacc8279..6bfe174012635 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -17,6 +17,7 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.api.common.state.SimpleStateDescriptor; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -46,7 +47,7 @@ * @param The type of {@link State}. * @param The type of {@link StateDescriptor}. */ -public abstract class AbstractRocksDBState, V> +public abstract class AbstractRocksDBState> implements InternalKvState, State { /** Serializer for the namespace */ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java index 1f306b44e1cda..7e2d317f15c71 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java @@ -44,7 +44,7 @@ * @param The type of the value returned from the state */ public class RocksDBAggregatingState - extends AbstractRocksDBState, AggregatingStateDescriptor, ACC> + extends AbstractRocksDBState, AggregatingStateDescriptor> implements InternalAggregatingState { /** Serializer for the values */ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 26dc3ddb2bf29..d4469f27806d3 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -41,7 +41,7 @@ * @param The type of the value in the folding state. */ public class RocksDBFoldingState - extends AbstractRocksDBState, FoldingStateDescriptor, ACC> + extends AbstractRocksDBState, FoldingStateDescriptor> implements InternalFoldingState { /** Serializer for the values */ @@ -101,7 +101,7 @@ public void add(T value) throws IOException { DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); if (valueBytes == null) { keySerializationStream.reset(); - valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out); + valueSerializer.serialize(foldFunction.fold(stateDesc.getInitialValue(), value), out); backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); } else { ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index d8d77b645e821..4bf002e969085 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -37,6 +37,7 @@ import org.apache.flink.migration.MigrationNamespaceSerializerProxy; import org.apache.flink.migration.MigrationUtil; import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.migration.util.MigrationInstantiationUtil; import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -60,6 +61,7 @@ import org.apache.flink.util.IOUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; + import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -69,6 +71,7 @@ import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.Snapshot; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -795,18 +798,20 @@ private void restoreKVStateData() throws IOException, RocksDBException { *

This also checks whether the {@link StateDescriptor} for a state matches the one * that we checkpointed, i.e. is already in the map of column families. */ - @SuppressWarnings("rawtypes, unchecked") protected ColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException { + StateDescriptor.Type type, + String name, + TypeSerializer namespaceSerializer, + TypeSerializer stateSerializer) throws IOException { Tuple2> stateInfo = - kvStateInformation.get(descriptor.getName()); + kvStateInformation.get(name); RegisteredBackendStateMetaInfo newMetaInfo = new RegisteredBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), + type, + name, namespaceSerializer, - descriptor.getSerializer()); + stateSerializer); if (stateInfo != null) { if (newMetaInfo.isCompatibleWith(stateInfo.f1)) { @@ -818,15 +823,14 @@ protected ColumnFamilyHandle getColumnFamily( } } - ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor( - descriptor.getName().getBytes(), columnOptions); + ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(name.getBytes(), columnOptions); try { ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor); Tuple2> tuple = new Tuple2<>(columnFamily, newMetaInfo); Map rawAccess = kvStateInformation; - rawAccess.put(descriptor.getName(), tuple); + rawAccess.put(name, tuple); return columnFamily; } catch (RocksDBException e) { throw new IOException("Error creating ColumnFamilyHandle.", e); @@ -838,7 +842,8 @@ protected InternalValueState createValueState( TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception { - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + ColumnFamilyHandle columnFamily = getColumnFamily( + stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer()); return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this); } @@ -848,7 +853,8 @@ protected InternalListState createListState( TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception { - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + ColumnFamilyHandle columnFamily = getColumnFamily( + stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer()); return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this); } @@ -858,7 +864,8 @@ protected InternalReducingState createReducingState( TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception { - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + ColumnFamilyHandle columnFamily = getColumnFamily( + stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer()); return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this); } @@ -868,7 +875,9 @@ protected InternalAggregatingState createAggregatingStat TypeSerializer namespaceSerializer, AggregatingStateDescriptor stateDesc) throws Exception { - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + ColumnFamilyHandle columnFamily = getColumnFamily( + stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer()); + return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this); } @@ -877,7 +886,8 @@ protected InternalFoldingState createFoldingState( TypeSerializer namespaceSerializer, FoldingStateDescriptor stateDesc) throws Exception { - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); + ColumnFamilyHandle columnFamily = getColumnFamily( + stateDesc.getType(), stateDesc.getName(), namespaceSerializer, stateDesc.getSerializer()); return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); } @@ -1142,30 +1152,40 @@ private void restoreOldSavepointKeyedState(Collection rest kvStateInformation.clear(); // first get the column family mapping - int numColumns = inputView.readInt(); - Map> columnFamilyMapping = new HashMap<>(numColumns); + final int numColumns = inputView.readInt(); + final Map> columnFamilyMapping = + new HashMap<>(numColumns); + for (int i = 0; i < numColumns; i++) { - byte mappingByte = inputView.readByte(); + final byte mappingByte = inputView.readByte(); ObjectInputStream ooIn = - new InstantiationUtil.ClassLoaderObjectInputStream( + new MigrationInstantiationUtil.ClassLoaderObjectInputStream( new DataInputViewStream(inputView), userCodeClassLoader); - StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject(); + org.apache.flink.migration.api.common.state.StateDescriptor stateDescriptor = + (org.apache.flink.migration.api.common.state.StateDescriptor) ooIn.readObject(); columnFamilyMapping.put(mappingByte, stateDescriptor); // this will fill in the k/v state information - getColumnFamily(stateDescriptor, MigrationNamespaceSerializerProxy.INSTANCE); + getColumnFamily( + stateDescriptor.getType(), stateDescriptor.getName(), + MigrationNamespaceSerializerProxy.INSTANCE, stateDescriptor.getSerializer()); } // try and read until EOF try { // the EOFException will get us out of this... while (true) { - byte mappingByte = inputView.readByte(); + final byte mappingByte = inputView.readByte(); + + org.apache.flink.migration.api.common.state.StateDescriptor stateDescriptor = + columnFamilyMapping.get(mappingByte); + ColumnFamilyHandle handle = getColumnFamily( - columnFamilyMapping.get(mappingByte), MigrationNamespaceSerializerProxy.INSTANCE); + stateDescriptor.getType(), stateDescriptor.getName(), + MigrationNamespaceSerializerProxy.INSTANCE, stateDescriptor.getSerializer()); byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView); diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index e6988f7a7ed15..dbcfd64b118e2 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -47,7 +47,7 @@ * @param The type of the values in the list state. */ public class RocksDBListState - extends AbstractRocksDBState, ListStateDescriptor, V> + extends AbstractRocksDBState, ListStateDescriptor> implements InternalListState { /** Serializer for the values */ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java index ccc98a701acb5..403a93758907d 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java @@ -43,7 +43,7 @@ * @param The type of value that the state state stores. */ public class RocksDBReducingState - extends AbstractRocksDBState, ReducingStateDescriptor, V> + extends AbstractRocksDBState, ReducingStateDescriptor> implements InternalReducingState { /** Serializer for the values */ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index b2a4fba265cfb..661763f1263db 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -39,7 +39,7 @@ * @param The type of value that the state state stores. */ public class RocksDBValueState - extends AbstractRocksDBState, ValueStateDescriptor, V> + extends AbstractRocksDBState, ValueStateDescriptor> implements InternalValueState { /** Serializer for the values */ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java index fa1cc45e0aa90..62a3c8f377efc 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java @@ -18,8 +18,8 @@ package org.apache.flink.migration.contrib.streaming.state; import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.migration.api.common.state.ValueStateDescriptor; import org.apache.flink.migration.runtime.state.AbstractStateBackend; import org.apache.flink.migration.runtime.state.KvStateSnapshot; import org.apache.flink.migration.runtime.state.StateHandle; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java index abdac915e874a..bdf1b1ac98a01 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java @@ -36,12 +36,14 @@ * @param The type of the values that are returned from the state. */ @PublicEvolving -public class AggregatingStateDescriptor extends StateDescriptor, ACC> { +public class AggregatingStateDescriptor extends SimpleStateDescriptor> { private static final long serialVersionUID = 1L; /** The aggregation function for the state */ private final AggregateFunction aggFunction; + // ------------------------------------------------------------------------ + /** * Creates a new state descriptor with the given name, function, and type. * @@ -57,7 +59,7 @@ public AggregatingStateDescriptor( AggregateFunction aggFunction, Class stateType) { - super(name, stateType, null); + super(name, stateType); this.aggFunction = checkNotNull(aggFunction); } @@ -73,7 +75,7 @@ public AggregatingStateDescriptor( AggregateFunction aggFunction, TypeInformation stateType) { - super(name, stateType, null); + super(name, stateType); this.aggFunction = checkNotNull(aggFunction); } @@ -89,10 +91,12 @@ public AggregatingStateDescriptor( AggregateFunction aggFunction, TypeSerializer typeSerializer) { - super(name, typeSerializer, null); + super(name, typeSerializer); this.aggFunction = checkNotNull(aggFunction); } + // ------------------------------------------------------------------------ + // Aggregating State Descriptor // ------------------------------------------------------------------------ @Override @@ -112,6 +116,8 @@ public Type getType() { return Type.AGGREGATING; } + // ------------------------------------------------------------------------ + // Standard Utils // ------------------------------------------------------------------------ @Override @@ -121,7 +127,7 @@ public boolean equals(Object o) { } else if (o != null && getClass() == o.getClass()) { AggregatingStateDescriptor that = (AggregatingStateDescriptor) o; - return serializer.equals(that.serializer) && name.equals(that.name); + return name.equals(that.name) && simpleStateDescrEquals(that); } else { return false; @@ -130,7 +136,7 @@ else if (o != null && getClass() == o.getClass()) { @Override public int hashCode() { - int result = serializer.hashCode(); + int result = simpleStateDescrHashCode(); result = 31 * result + name.hashCode(); return result; } @@ -138,8 +144,8 @@ public int hashCode() { @Override public String toString() { return "AggregatingStateDescriptor{" + - "serializer=" + serializer + - ", aggFunction=" + aggFunction + + "aggFunction=" + aggFunction + + ", " + simpleStateDescrToString() + '}'; } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java index 143945e5e6037..71d3f545871f0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java @@ -23,23 +23,36 @@ import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import static java.util.Objects.requireNonNull; /** - * {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned - * folding state. + * A {@link StateDescriptor} for {@link FoldingState}. This can be used to create + * keyed folding state. * - * @param Type of the values folded int othe state + * @param Type of the values folded into the state * @param Type of the value in the state */ @PublicEvolving -public class FoldingStateDescriptor extends StateDescriptor, ACC> { +public class FoldingStateDescriptor extends SimpleStateDescriptor> { private static final long serialVersionUID = 1L; - + /** The function that folds the state */ private final FoldFunction foldFunction; + /** The initial accumulator value for the fold() operation */ + private transient ACC initialValue; + + // ------------------------------------------------------------------------ + /** * Creates a new {@code FoldingStateDescriptor} with the given name, type, and initial value. * @@ -52,12 +65,14 @@ public class FoldingStateDescriptor extends StateDescriptor foldFunction, Class typeClass) { - super(name, typeClass, initialValue); - this.foldFunction = requireNonNull(foldFunction); + super(name, typeClass); if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); } + + this.foldFunction = requireNonNull(foldFunction); + this.initialValue = initialValue; } /** @@ -69,12 +84,14 @@ public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction foldFunction, TypeInformation typeInfo) { - super(name, typeInfo, initialValue); - this.foldFunction = requireNonNull(foldFunction); + super(name, typeInfo); if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); } + + this.foldFunction = requireNonNull(foldFunction); + this.initialValue = initialValue; } /** @@ -86,16 +103,25 @@ public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction foldFunction, TypeSerializer typeSerializer) { - super(name, typeSerializer, initialValue); - this.foldFunction = requireNonNull(foldFunction); + super(name, typeSerializer); if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); } + + this.foldFunction = requireNonNull(foldFunction); + this.initialValue = initialValue; } // ------------------------------------------------------------------------ - + // Folding State Descriptor + // ------------------------------------------------------------------------ + + @Override + public Type getType() { + return Type.FOLDING; + } + @Override public FoldingState bind(StateBackend stateBackend) throws Exception { return stateBackend.createFoldingState(this); @@ -108,6 +134,19 @@ public FoldFunction getFoldFunction() { return foldFunction; } + /** + * Returns the initial value used in the folding. + */ + public ACC getInitialValue() { + if (initialValue != null) { + return getSerializer().copy(initialValue); + } else { + return null; + } + } + + // ------------------------------------------------------------------------ + @Override public boolean equals(Object o) { if (this == o) { @@ -118,14 +157,13 @@ public boolean equals(Object o) { } FoldingStateDescriptor that = (FoldingStateDescriptor) o; - - return serializer.equals(that.serializer) && name.equals(that.name); + return name.equals(that.name) && simpleStateDescrEquals(that); } @Override public int hashCode() { - int result = serializer.hashCode(); + int result = simpleStateDescrHashCode(); result = 31 * result + name.hashCode(); return result; } @@ -133,14 +171,75 @@ public int hashCode() { @Override public String toString() { return "FoldingStateDescriptor{" + - "serializer=" + serializer + - ", initialValue=" + defaultValue + + simpleStateDescrToString() + + ", initialValue=" + initialValue + ", foldFunction=" + foldFunction + '}'; } - @Override - public Type getType() { - return Type.FOLDING; + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + private void writeObject(final ObjectOutputStream out) throws IOException { + // write the fold function + out.defaultWriteObject(); + + // write the non-serializable default value field + if (initialValue == null) { + // we don't have an initial value + out.writeBoolean(false); + } else { + // we have an initial value + out.writeBoolean(true); + + byte[] serializedInitialValue; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) + { + // we duplicate the type serializer here, because the serializers may be asynchronously + // serialized into asynchronous snapshots + // Note: as of Flink 1.2, only the serializers are written, but not the entire state + // descriptors any more, so we may be safe do drop this? + TypeSerializer duplicateSerializer = getSerializer().duplicate(); + duplicateSerializer.serialize(initialValue, outView); + + outView.flush(); + serializedInitialValue = baos.toByteArray(); + } + catch (Exception e) { + throw new IOException("Unable to serialize initial value of type " + + initialValue.getClass().getSimpleName() + ".", e); + } + + out.writeInt(serializedInitialValue.length); + out.write(serializedInitialValue); + } + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + // read the fold function + in.defaultReadObject(); + + // read the initial value field + boolean hasInitialValue = in.readBoolean(); + if (hasInitialValue) { + int size = in.readInt(); + + byte[] buffer = new byte[size]; + + in.readFully(buffer); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) + { + initialValue = getSerializer().deserialize(inView); + } + catch (Exception e) { + throw new IOException("Unable to deserialize initial value.", e); + } + } else { + initialValue = null; + } } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java index 6861a0766911c..f076c670f2149 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java @@ -27,10 +27,10 @@ * list state using * {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}. * - * @param The type of the values that can be added to the list state. + * @param The type of the elements in the list state. */ @PublicEvolving -public class ListStateDescriptor extends StateDescriptor, T> { +public class ListStateDescriptor extends SimpleStateDescriptor> { private static final long serialVersionUID = 1L; /** @@ -40,39 +40,50 @@ public class ListStateDescriptor extends StateDescriptor, T> { * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor. * * @param name The (unique) name for the state. - * @param typeClass The type of the values in the state. + * @param elementTypeClass The type of the elements in the state. */ - public ListStateDescriptor(String name, Class typeClass) { - super(name, typeClass, null); + public ListStateDescriptor(String name, Class elementTypeClass) { + super(name, elementTypeClass); } /** * Creates a new {@code ListStateDescriptor} with the given name and list element type. * * @param name The (unique) name for the state. - * @param typeInfo The type of the values in the state. + * @param elementTypeInfo The type of the elements in the state. */ - public ListStateDescriptor(String name, TypeInformation typeInfo) { - super(name, typeInfo, null); + public ListStateDescriptor(String name, TypeInformation elementTypeInfo) { + super(name, elementTypeInfo); } /** * Creates a new {@code ListStateDescriptor} with the given name and list element type. * * @param name The (unique) name for the state. - * @param typeSerializer The type serializer for the list values. + * @param elementTypeSerializer The type serializer for the elements in the state. */ - public ListStateDescriptor(String name, TypeSerializer typeSerializer) { - super(name, typeSerializer, null); + public ListStateDescriptor(String name, TypeSerializer elementTypeSerializer) { + super(name, elementTypeSerializer); } - + + // ------------------------------------------------------------------------ + // List State Descriptor // ------------------------------------------------------------------------ + @Override + public Type getType() { + return Type.LIST; + } + @Override public ListState bind(StateBackend stateBackend) throws Exception { return stateBackend.createListState(this); } + // ------------------------------------------------------------------------ + // Standard utils + // ------------------------------------------------------------------------ + @Override public boolean equals(Object o) { if (this == o) { @@ -83,14 +94,12 @@ public boolean equals(Object o) { } ListStateDescriptor that = (ListStateDescriptor) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - + return name.equals(that.name) && simpleStateDescrEquals(that); } @Override public int hashCode() { - int result = serializer.hashCode(); + int result = simpleStateDescrHashCode(); result = 31 * result + name.hashCode(); return result; } @@ -98,12 +107,7 @@ public int hashCode() { @Override public String toString() { return "ListStateDescriptor{" + - "serializer=" + serializer + + simpleStateDescrToString() + '}'; } - - @Override - public Type getType() { - return Type.LIST; - } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java index a1d4225fbb2e5..76ef4fb6d0cb1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java @@ -27,17 +27,17 @@ import static java.util.Objects.requireNonNull; /** - * {@link StateDescriptor} for {@link ReducingState}. This can be used to create partitioned - * reducing state using + * A {@link StateDescriptor} for {@link ReducingState}. This descriptor can be used to create + * keyed reducing state using * {@link org.apache.flink.api.common.functions.RuntimeContext#getReducingState(ReducingStateDescriptor)}. * * @param The type of the values that can be added to the list state. */ @PublicEvolving -public class ReducingStateDescriptor extends StateDescriptor, T> { +public class ReducingStateDescriptor extends SimpleStateDescriptor> { private static final long serialVersionUID = 1L; - - + + /** The ReduceFunction that aggregates elements in the state */ private final ReduceFunction reduceFunction; /** @@ -51,12 +51,13 @@ public class ReducingStateDescriptor extends StateDescriptor * @param typeClass The type of the values in the state. */ public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, Class typeClass) { - super(name, typeClass, null); - this.reduceFunction = requireNonNull(reduceFunction); + super(name, typeClass); if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction."); } + + this.reduceFunction = requireNonNull(reduceFunction); } /** @@ -67,7 +68,12 @@ public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, Cl * @param typeInfo The type of the values in the state. */ public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, TypeInformation typeInfo) { - super(name, typeInfo, null); + super(name, typeInfo); + + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction."); + } + this.reduceFunction = requireNonNull(reduceFunction); } @@ -79,12 +85,24 @@ public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, Ty * @param typeSerializer The type serializer of the values in the state. */ public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, TypeSerializer typeSerializer) { - super(name, typeSerializer, null); + super(name, typeSerializer); + + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction."); + } + this.reduceFunction = requireNonNull(reduceFunction); } // ------------------------------------------------------------------------ - + // Reducing State Descriptor + // ------------------------------------------------------------------------ + + @Override + public Type getType() { + return Type.REDUCING; + } + @Override public ReducingState bind(StateBackend stateBackend) throws Exception { return stateBackend.createReducingState(this); @@ -97,6 +115,10 @@ public ReduceFunction getReduceFunction() { return reduceFunction; } + // ------------------------------------------------------------------------ + // Standard Utils + // ------------------------------------------------------------------------ + @Override public boolean equals(Object o) { if (this == o) { @@ -107,14 +129,12 @@ public boolean equals(Object o) { } ReducingStateDescriptor that = (ReducingStateDescriptor) o; - - return serializer.equals(that.serializer) && name.equals(that.name); - + return name.equals(that.name) && simpleStateDescrEquals(that); } @Override public int hashCode() { - int result = serializer.hashCode(); + int result = simpleStateDescrHashCode(); result = 31 * result + name.hashCode(); return result; } @@ -122,13 +142,8 @@ public int hashCode() { @Override public String toString() { return "ReducingStateDescriptor{" + - "serializer=" + serializer + + simpleStateDescrToString() + ", reduceFunction=" + reduceFunction + '}'; } - - @Override - public Type getType() { - return Type.REDUCING; - } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/SimpleStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/SimpleStateDescriptor.java new file mode 100644 index 0000000000000..5bf2f79ecd63d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/SimpleStateDescriptor.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.io.IOException; +import java.io.ObjectOutputStream; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Base class for the descriptors of simple states. Simple state have one value + * that they store for a key. The counterpart to simple states are states that have composite values. + * + * @param The type of the value in the state. + * @param The type of the created state. + */ +@PublicEvolving +public abstract class SimpleStateDescriptor extends StateDescriptor { + + private static final long serialVersionUID = 1L; + + /** The serializer for the type. May be eagerly initialized in the constructor, + * or lazily once the type is serialized or an ExecutionConfig is provided. */ + private TypeSerializer typeSerializer; + + /** The type information describing the value type. Only used to lazily create the serializer + * and dropped during serialization */ + private transient TypeInformation typeInfo; + + // ------------------------------------------------------------------------ + + /** + * Create a new {@code SimpleStateDescriptor} with the given name and the given type serializer. + * + * @param name The name of the {@code SimpleStateDescriptor}. + * @param serializer The type serializer for the values in the state. + */ + protected SimpleStateDescriptor(String name, TypeSerializer serializer) { + super(name); + this.typeSerializer = checkNotNull(serializer, "serializer must not be null"); + } + + /** + * Create a new {@code SimpleStateDescriptor} with the given name and the given type information. + * + * @param name The name of the {@code SimpleStateDescriptor}. + * @param typeInfo The type information for the values in the state. + */ + protected SimpleStateDescriptor(String name, TypeInformation typeInfo) { + super(name); + this.typeInfo = checkNotNull(typeInfo, "type information must not be null"); + } + + /** + * Create a new {@code StateDescriptor} with the given name and the given type information. + * + *

If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #SimpleStateDescriptor(String, TypeInformation)} constructor. + * + * @param name The name of the {@code StateDescriptor}. + * @param type The class of the type of values in the state. + */ + protected SimpleStateDescriptor(String name, Class type) { + super(name); + + checkNotNull(type, "type class must not be null"); + + try { + this.typeInfo = TypeExtractor.createTypeInfo(type); + } catch (Exception e) { + throw new IllegalArgumentException("Cannot create full type information based on the given class. " + + "If the type has generics, Flink needs to capture the generics. Please use constructor " + + "StateDescriptor(String name, TypeInformation typeInfo) instead. You can create the" + + "TypeInformation via the 'TypeInformation.of(TypeHint)' method.", e); + } + } + + // ------------------------------------------------------------------------ + // Serializers + // ------------------------------------------------------------------------ + + /** + * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + * Note that the serializer may initialized lazily and is only guaranteed to exist after + * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}. + */ + public TypeSerializer getSerializer() { + checkState(typeSerializer != null, "serializer not yet initialized."); + return typeSerializer; + } + + @Override + public boolean isSerializerInitialized() { + return typeSerializer != null; + } + + @Override + public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { + if (typeSerializer == null) { + checkState(typeInfo != null, "Cannot initialize serializer after TypeInformation was dropped during serialization"); + typeSerializer = typeInfo.createSerializer(executionConfig); + } + } + + // ------------------------------------------------------------------------ + // equals() / hashCode() / toString() helpers + // ------------------------------------------------------------------------ + + protected int simpleStateDescrHashCode() { + return typeSerializer != null ? typeSerializer.hashCode() : 123; + } + + protected boolean simpleStateDescrEquals(SimpleStateDescriptor other) { + return this.typeSerializer == null ? other.typeSerializer == null : + (other.typeSerializer != null && this.typeSerializer.equals(other.typeSerializer)); + } + + protected String simpleStateDescrToString() { + return "serializer=" + String.valueOf(typeSerializer); + } + + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + private void writeObject(final ObjectOutputStream out) throws IOException { + // make sure we have a serializer before the type information gets lost + // TODO this is the source of loss of type tags in some user programs that create + // TODO the type descriptors on the client side + initializeSerializerUnlessSet(new ExecutionConfig()); + + // write all the non-transient fields + out.defaultWriteObject(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java index 86e9ec8043dac..37b65e3ab79f9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/State.java @@ -21,12 +21,17 @@ import org.apache.flink.annotation.PublicEvolving; /** - * Interface that different types of partitioned state must implement. + * Base interface for all types of keyed state must implement. Sub-classes of this + * interface define the specific type and behavior of the state, for example + * {@link ValueState} or {@link ListState}. * - *

The state is only accessible by functions applied on a KeyedDataStream. The key is - * automatically supplied by the system, so the function always sees the value mapped to the + *

Keyed state is only accessible by functions applied on a {@code KeyedDataStream}, which + * is the result of a {@code DataStream.keyBy()} operation. + * + *

The key is automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning - * consistently together. + * consistently together, making state accesses local and allowing the system to transparently + * re-shard the state when changing the parallelism of the operation that works with the state. */ @PublicEvolving public interface State { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index b901d03ab2538..5ee8666ca0565 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -20,34 +20,29 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import static java.util.Objects.requireNonNull; /** - * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned - * {@link State} in stateful operations. This contains the name and can create an actual state - * object given a {@link StateBackend} using {@link #bind(StateBackend)}. - * + * Base class for state descriptors. A {@code StateDescriptor} is used for creating + * {@link State keyed state} in stateful operations. The descriptor contains the name of the state, + * the serializer to persist the state type, and the configuration that defines the state + * as queryable. + * + *

Implementation notes

+ * + * The actual state object is created given a {@link StateBackend} using the + * {@link StateDescriptor#bind(StateBackend)} method. + * *

Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}. * * @param The type of the State objects created from this {@code StateDescriptor}. - * @param The type of the value of the state object described by this state descriptor. */ @PublicEvolving -public abstract class StateDescriptor implements Serializable { +public abstract class StateDescriptor implements Serializable { /** * An enumeration of the types of supported states. Used to identify the state type @@ -65,74 +60,22 @@ public enum Type { /** Name that uniquely identifies state created from this StateDescriptor. */ protected final String name; - /** The serializer for the type. May be eagerly initialized in the constructor, - * or lazily once the type is serialized or an ExecutionConfig is provided. */ - protected TypeSerializer serializer; - /** Name for queries against state created from this StateDescriptor. */ private String queryableStateName; - /** The default value returned by the state when no other value is bound to a key */ - protected transient T defaultValue; - - /** The type information describing the value type. Only used to lazily create the serializer - * and dropped during serialization */ - private transient TypeInformation typeInfo; - // ------------------------------------------------------------------------ /** * Create a new {@code StateDescriptor} with the given name and the given type serializer. * * @param name The name of the {@code StateDescriptor}. - * @param serializer The type serializer for the values in the state. - * @param defaultValue The default value that will be set when requesting state without setting - * a value before. - */ - protected StateDescriptor(String name, TypeSerializer serializer, T defaultValue) { - this.name = requireNonNull(name, "name must not be null"); - this.serializer = requireNonNull(serializer, "serializer must not be null"); - this.defaultValue = defaultValue; - } - - /** - * Create a new {@code StateDescriptor} with the given name and the given type information. - * - * @param name The name of the {@code StateDescriptor}. - * @param typeInfo The type information for the values in the state. - * @param defaultValue The default value that will be set when requesting state without setting - * a value before. - */ - protected StateDescriptor(String name, TypeInformation typeInfo, T defaultValue) { - this.name = requireNonNull(name, "name must not be null"); - this.typeInfo = requireNonNull(typeInfo, "type information must not be null"); - this.defaultValue = defaultValue; - } - - /** - * Create a new {@code StateDescriptor} with the given name and the given type information. - * - *

If this constructor fails (because it is not possible to describe the type via a class), - * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor. - * - * @param name The name of the {@code StateDescriptor}. - * @param type The class of the type of values in the state. - * @param defaultValue The default value that will be set when requesting state without setting - * a value before. */ - protected StateDescriptor(String name, Class type, T defaultValue) { + protected StateDescriptor(String name) { this.name = requireNonNull(name, "name must not be null"); - requireNonNull(type, "type class must not be null"); - - try { - this.typeInfo = TypeExtractor.createTypeInfo(type); - } catch (Exception e) { - throw new RuntimeException("Cannot create full type information based on the given class. If the type has generics, please", e); - } - - this.defaultValue = defaultValue; } + // ------------------------------------------------------------------------ + // Core State Descriptor Methods // ------------------------------------------------------------------------ /** @@ -143,32 +86,21 @@ public String getName() { } /** - * Returns the default value. + * Gets the type of the state represented by this descriptor. + * @return The state's type */ - public T getDefaultValue() { - if (defaultValue != null) { - if (serializer != null) { - return serializer.copy(defaultValue); - } else { - throw new IllegalStateException("Serializer not yet initialized."); - } - } else { - return null; - } - } + public abstract Type getType(); /** - * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. - * Note that the serializer may initialized lazily and is only guaranteed to exist after - * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}. + * Creates a new {@link State} on the given {@link StateBackend}. + * + * @param stateBackend The {@code StateBackend} on which to create the {@link State}. */ - public TypeSerializer getSerializer() { - if (serializer != null) { - return serializer; - } else { - throw new IllegalStateException("Serializer not yet initialized."); - } - } + public abstract S bind(StateBackend stateBackend) throws Exception; + + // ------------------------------------------------------------------------ + // Queryable State Options + // ------------------------------------------------------------------------ /** * Sets the name for queries of state created from this descriptor. @@ -207,13 +139,8 @@ public boolean isQueryable() { return queryableStateName != null; } - /** - * Creates a new {@link State} on the given {@link StateBackend}. - * - * @param stateBackend The {@code StateBackend} on which to create the {@link State}. - */ - public abstract S bind(StateBackend stateBackend) throws Exception; - + // ------------------------------------------------------------------------ + // Serializer initialization // ------------------------------------------------------------------------ /** @@ -223,41 +150,14 @@ public boolean isQueryable() { * * @return True if the serializers have been initialized, false otherwise. */ - public boolean isSerializerInitialized() { - return serializer != null; - } + public abstract boolean isSerializerInitialized(); /** * Initializes the serializer, unless it has been initialized before. * * @param executionConfig The execution config to use when creating the serializer. */ - public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { - if (serializer == null) { - if (typeInfo != null) { - serializer = typeInfo.createSerializer(executionConfig); - } else { - throw new IllegalStateException( - "Cannot initialize serializer after TypeInformation was dropped during serialization"); - } - } - } - - /** - * This method should be called by subclasses prior to serialization. Because the TypeInformation is - * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor - * needs to make sure that the serializer is created before the TypeInformation is dropped. - */ - private void ensureSerializerCreated() { - if (serializer == null) { - if (typeInfo != null) { - serializer = typeInfo.createSerializer(new ExecutionConfig()); - } else { - throw new IllegalStateException( - "Cannot initialize serializer after TypeInformation was dropped during serialization"); - } - } - } + public abstract void initializeSerializerUnlessSet(ExecutionConfig executionConfig); // ------------------------------------------------------------------------ // Standard Utils @@ -270,79 +170,5 @@ private void ensureSerializerCreated() { public abstract boolean equals(Object o); @Override - public String toString() { - return getClass().getSimpleName() + - "{name=" + name + - ", defaultValue=" + defaultValue + - ", serializer=" + serializer + - (isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") + - '}'; - } - - public abstract Type getType(); - - // ------------------------------------------------------------------------ - // Serialization - // ------------------------------------------------------------------------ - - private void writeObject(final ObjectOutputStream out) throws IOException { - // make sure we have a serializer before the type information gets lost - ensureSerializerCreated(); - - // write all the non-transient fields - out.defaultWriteObject(); - - // write the non-serializable default value field - if (defaultValue == null) { - // we don't have a default value - out.writeBoolean(false); - } else { - // we have a default value - out.writeBoolean(true); - - byte[] serializedDefaultValue; - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) - { - TypeSerializer duplicateSerializer = serializer.duplicate(); - duplicateSerializer.serialize(defaultValue, outView); - - outView.flush(); - serializedDefaultValue = baos.toByteArray(); - } - catch (Exception e) { - throw new IOException("Unable to serialize default value of type " + - defaultValue.getClass().getSimpleName() + ".", e); - } - - out.writeInt(serializedDefaultValue.length); - out.write(serializedDefaultValue); - } - } - - private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { - // read the non-transient fields - in.defaultReadObject(); - - // read the default value field - boolean hasDefaultValue = in.readBoolean(); - if (hasDefaultValue) { - int size = in.readInt(); - - byte[] buffer = new byte[size]; - - in.readFully(buffer); - - try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); - DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) - { - defaultValue = serializer.deserialize(inView); - } - catch (Exception e) { - throw new IOException("Unable to deserialize default value.", e); - } - } else { - defaultValue = null; - } - } + public abstract String toString(); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java index b3006c4f14397..674235845323f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java @@ -21,9 +21,17 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; /** - * {@link StateDescriptor} for {@link ValueState}. This can be used to create partitioned + * A {@link StateDescriptor} for {@link ValueState}. This can be used to create keyed * value state using * {@link org.apache.flink.api.common.functions.RuntimeContext#getState(ValueStateDescriptor)}. * @@ -33,9 +41,14 @@ * @param The type of the values that the value state can hold. */ @PublicEvolving -public class ValueStateDescriptor extends StateDescriptor, T> { +public class ValueStateDescriptor extends SimpleStateDescriptor> { private static final long serialVersionUID = 1L; - + + /** The default value returned by the state when no other value is bound to a key */ + protected transient T defaultValue; + + // ------------------------------------------------------------------------ + /** * Creates a new {@code ValueStateDescriptor} with the given name, type, and default value. * @@ -52,7 +65,9 @@ public class ValueStateDescriptor extends StateDescriptor, T> { */ @Deprecated public ValueStateDescriptor(String name, Class typeClass, T defaultValue) { - super(name, typeClass, defaultValue); + super(name, typeClass); + + this.defaultValue = defaultValue; } /** @@ -68,7 +83,9 @@ public ValueStateDescriptor(String name, Class typeClass, T defaultValue) { */ @Deprecated public ValueStateDescriptor(String name, TypeInformation typeInfo, T defaultValue) { - super(name, typeInfo, defaultValue); + super(name, typeInfo); + + this.defaultValue = defaultValue; } /** @@ -85,7 +102,9 @@ public ValueStateDescriptor(String name, TypeInformation typeInfo, T defaultV */ @Deprecated public ValueStateDescriptor(String name, TypeSerializer typeSerializer, T defaultValue) { - super(name, typeSerializer, defaultValue); + super(name, typeSerializer); + + this.defaultValue = defaultValue; } /** @@ -98,7 +117,7 @@ public ValueStateDescriptor(String name, TypeSerializer typeSerializer, T def * @param typeClass The type of the values in the state. */ public ValueStateDescriptor(String name, Class typeClass) { - super(name, typeClass, null); + super(name, typeClass); } /** @@ -108,7 +127,7 @@ public ValueStateDescriptor(String name, Class typeClass) { * @param typeInfo The type of the values in the state. */ public ValueStateDescriptor(String name, TypeInformation typeInfo) { - super(name, typeInfo, null); + super(name, typeInfo); } /** @@ -118,16 +137,39 @@ public ValueStateDescriptor(String name, TypeInformation typeInfo) { * @param typeSerializer The type serializer of the values in the state. */ public ValueStateDescriptor(String name, TypeSerializer typeSerializer) { - super(name, typeSerializer, null); + super(name, typeSerializer); } // ------------------------------------------------------------------------ - + // Value State Descriptor + // ------------------------------------------------------------------------ + + @Override + public Type getType() { + return Type.VALUE; + } + @Override public ValueState bind(StateBackend stateBackend) throws Exception { return stateBackend.createValueState(this); } + /** + * Returns the default value. + */ + public T getDefaultValue() { + if (defaultValue == null) { + return null; + } + else { + return getSerializer().copy(defaultValue); + } + } + + // ------------------------------------------------------------------------ + // Standard Utils + // ------------------------------------------------------------------------ + @Override public boolean equals(Object o) { if (this == o) { @@ -138,14 +180,13 @@ public boolean equals(Object o) { } ValueStateDescriptor that = (ValueStateDescriptor) o; - - return serializer.equals(that.serializer) && name.equals(that.name); + return name.equals(that.name) && simpleStateDescrEquals(that); } @Override public int hashCode() { - int result = serializer.hashCode(); + int result = simpleStateDescrHashCode(); result = 31 * result + name.hashCode(); return result; } @@ -155,12 +196,71 @@ public String toString() { return "ValueStateDescriptor{" + "name=" + name + ", defaultValue=" + defaultValue + - ", serializer=" + serializer + + ", " + simpleStateDescrToString() + '}'; } - @Override - public Type getType() { - return Type.VALUE; + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + private void writeObject(final ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + + // write the non-serializable default value field + if (defaultValue == null) { + // we don't have a default value + out.writeBoolean(false); + } else { + // we have a default value + out.writeBoolean(true); + + byte[] serializedDefaultValue; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) + { + // we duplicate the type serializer here, because the serializers may be asynchronously + // serialized into asynchronous snapshots + // Note: as of Flink 1.2, only the serializers are written, but not the entire state + // descriptors any more, so we may be safe do drop this? + TypeSerializer duplicateSerializer = getSerializer().duplicate(); + duplicateSerializer.serialize(defaultValue, outView); + + outView.flush(); + serializedDefaultValue = baos.toByteArray(); + } + catch (Exception e) { + throw new IOException("Unable to serialize default value of type " + + defaultValue.getClass().getSimpleName() + ".", e); + } + + out.writeInt(serializedDefaultValue.length); + out.write(serializedDefaultValue); + } + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // read the default value field + boolean hasDefaultValue = in.readBoolean(); + if (hasDefaultValue) { + int size = in.readInt(); + + byte[] buffer = new byte[size]; + + in.readFully(buffer); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) + { + defaultValue = getSerializer().deserialize(inView); + } + catch (Exception e) { + throw new IOException("Unable to deserialize default value.", e); + } + } else { + defaultValue = null; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/FoldingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/FoldingStateDescriptor.java new file mode 100644 index 0000000000000..d32ae40e18ccc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/FoldingStateDescriptor.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.api.common.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import static java.util.Objects.requireNonNull; + +/** + * Migration class present only to support resuming old savepoints that contained java-serialized data. + */ +@Internal +@Deprecated +public class FoldingStateDescriptor extends org.apache.flink.migration.api.common.state.StateDescriptor, ACC> { + private static final long serialVersionUID = 1L; + + + private final FoldFunction foldFunction; + + /** + * Creates a new {@code FoldingStateDescriptor} with the given name, type, and initial value. + * + *

If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #FoldingStateDescriptor(String, ACC, FoldFunction, TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param initialValue The initial value of the fold. + * @param foldFunction The {@code FoldFunction} used to aggregate the state. + * @param typeClass The type of the values in the state. + */ + public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction foldFunction, Class typeClass) { + super(name, typeClass, initialValue); + this.foldFunction = requireNonNull(foldFunction); + + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); + } + } + + /** + * Creates a new {@code FoldingStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param initialValue The initial value of the fold. + * @param foldFunction The {@code FoldFunction} used to aggregate the state. + * @param typeInfo The type of the values in the state. + */ + public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction foldFunction, TypeInformation typeInfo) { + super(name, typeInfo, initialValue); + this.foldFunction = requireNonNull(foldFunction); + + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); + } + } + + /** + * Creates a new {@code ValueStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param initialValue The initial value of the fold. + * @param foldFunction The {@code FoldFunction} used to aggregate the state. + * @param typeSerializer The type serializer of the values in the state. + */ + public FoldingStateDescriptor(String name, ACC initialValue, FoldFunction foldFunction, TypeSerializer typeSerializer) { + super(name, typeSerializer, initialValue); + this.foldFunction = requireNonNull(foldFunction); + + if (foldFunction instanceof RichFunction) { + throw new UnsupportedOperationException("FoldFunction of FoldingState can not be a RichFunction."); + } + } + + // ------------------------------------------------------------------------ + + @Override + public FoldingState bind(StateBackend stateBackend) throws Exception { + return stateBackend.createFoldingState(this); + } + + /** + * Returns the fold function to be used for the folding state. + */ + public FoldFunction getFoldFunction() { + return foldFunction; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FoldingStateDescriptor that = (FoldingStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "FoldingStateDescriptor{" + + "serializer=" + serializer + + ", initialValue=" + defaultValue + + ", foldFunction=" + foldFunction + + '}'; + } + + @Override + public org.apache.flink.api.common.state.StateDescriptor.Type getType() { + return org.apache.flink.api.common.state.StateDescriptor.Type.FOLDING; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java new file mode 100644 index 0000000000000..54130bbe7599e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ListStateDescriptor.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.api.common.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Migration class present only to support resuming old savepoints that contained java-serialized data. + */ +@Internal +@Deprecated +public class ListStateDescriptor extends StateDescriptor, T> { + private static final long serialVersionUID = 1L; + + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + *

If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ListStateDescriptor(String, TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param typeClass The type of the values in the state. + */ + public ListStateDescriptor(String name, Class typeClass) { + super(name, typeClass, null); + } + + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * @param name The (unique) name for the state. + * @param typeInfo The type of the values in the state. + */ + public ListStateDescriptor(String name, TypeInformation typeInfo) { + super(name, typeInfo, null); + } + + /** + * Creates a new {@code ListStateDescriptor} with the given name and list element type. + * + * @param name The (unique) name for the state. + * @param typeSerializer The type serializer for the list values. + */ + public ListStateDescriptor(String name, TypeSerializer typeSerializer) { + super(name, typeSerializer, null); + } + + // ------------------------------------------------------------------------ + + @Override + public ListState bind(StateBackend stateBackend) throws Exception { + return stateBackend.createListState(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ListStateDescriptor that = (ListStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ListStateDescriptor{" + + "serializer=" + serializer + + '}'; + } + + @Override + public org.apache.flink.api.common.state.StateDescriptor.Type getType() { + return org.apache.flink.api.common.state.StateDescriptor.Type.LIST; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ReducingStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ReducingStateDescriptor.java new file mode 100644 index 0000000000000..47157b207b129 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ReducingStateDescriptor.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.api.common.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import static java.util.Objects.requireNonNull; + +/** + * Migration class present only to support resuming old savepoints that contained java-serialized data. + */ +@Internal +@Deprecated +public class ReducingStateDescriptor extends StateDescriptor, T> { + private static final long serialVersionUID = 1L; + + + private final ReduceFunction reduceFunction; + + /** + * Creates a new {@code ReducingStateDescriptor} with the given name, type, and default value. + * + *

If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ReducingStateDescriptor(String, ReduceFunction, TypeInformation)} constructor. + * + * @param name The (unique) name for the state. + * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. + * @param typeClass The type of the values in the state. + */ + public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, Class typeClass) { + super(name, typeClass, null); + this.reduceFunction = requireNonNull(reduceFunction); + + if (reduceFunction instanceof RichFunction) { + throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction."); + } + } + + /** + * Creates a new {@code ReducingStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. + * @param typeInfo The type of the values in the state. + */ + public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, TypeInformation typeInfo) { + super(name, typeInfo, null); + this.reduceFunction = requireNonNull(reduceFunction); + } + + /** + * Creates a new {@code ValueStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param reduceFunction The {@code ReduceFunction} used to aggregate the state. + * @param typeSerializer The type serializer of the values in the state. + */ + public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, TypeSerializer typeSerializer) { + super(name, typeSerializer, null); + this.reduceFunction = requireNonNull(reduceFunction); + } + + // ------------------------------------------------------------------------ + + @Override + public ReducingState bind(StateBackend stateBackend) throws Exception { + return stateBackend.createReducingState(this); + } + + /** + * Returns the reduce function to be used for the reducing state. + */ + public ReduceFunction getReduceFunction() { + return reduceFunction; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ReducingStateDescriptor that = (ReducingStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ReducingStateDescriptor{" + + "serializer=" + serializer + + ", reduceFunction=" + reduceFunction + + '}'; + } + + @Override + public org.apache.flink.api.common.state.StateDescriptor.Type getType() { + return org.apache.flink.api.common.state.StateDescriptor.Type.REDUCING; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateBackend.java new file mode 100644 index 0000000000000..2e3aca493b9ad --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateBackend.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.api.common.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ValueState; + +/** + * Migration class present only to support resuming old savepoints that contained java-serialized data. + */ +@Internal +@Deprecated +public interface StateBackend { + + /** + * Creates and returns a new {@link ValueState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param The type of the value that the {@code ValueState} can store. + */ + ValueState createValueState(ValueStateDescriptor stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ListState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param The type of the values that the {@code ListState} can store. + */ + ListState createListState(ListStateDescriptor stateDesc) throws Exception; + + /** + * Creates and returns a new {@link ReducingState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param The type of the values that the {@code ListState} can store. + */ + ReducingState createReducingState(ReducingStateDescriptor stateDesc) throws Exception; + + /** + * Creates and returns a new {@link FoldingState}. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * + * @param Type of the values folded into the state + * @param Type of the value in the state + */ + FoldingState createFoldingState(FoldingStateDescriptor stateDesc) throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateDescriptor.java new file mode 100644 index 0000000000000..3dbe3d9e787dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/StateDescriptor.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.api.common.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import static java.util.Objects.requireNonNull; + +/** + * Migration class present only to support resuming old savepoints that contained java-serialized data. + */ +@Internal +@Deprecated +public abstract class StateDescriptor implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Name that uniquely identifies state created from this StateDescriptor. */ + protected final String name; + + /** The serializer for the type. May be eagerly initialized in the constructor, + * or lazily once the type is serialized or an ExecutionConfig is provided. */ + protected TypeSerializer serializer; + + /** Name for queries against state created from this StateDescriptor. */ + private String queryableStateName; + + /** The default value returned by the state when no other value is bound to a key */ + protected transient T defaultValue; + + /** The type information describing the value type. Only used to lazily create the serializer + * and dropped during serialization */ + private transient TypeInformation typeInfo; + + // ------------------------------------------------------------------------ + + /** + * Create a new {@code StateDescriptor} with the given name and the given type serializer. + * + * @param name The name of the {@code StateDescriptor}. + * @param serializer The type serializer for the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + protected StateDescriptor(String name, TypeSerializer serializer, T defaultValue) { + this.name = requireNonNull(name, "name must not be null"); + this.serializer = requireNonNull(serializer, "serializer must not be null"); + this.defaultValue = defaultValue; + } + + /** + * Create a new {@code StateDescriptor} with the given name and the given type information. + * + * @param name The name of the {@code StateDescriptor}. + * @param typeInfo The type information for the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + protected StateDescriptor(String name, TypeInformation typeInfo, T defaultValue) { + this.name = requireNonNull(name, "name must not be null"); + this.typeInfo = requireNonNull(typeInfo, "type information must not be null"); + this.defaultValue = defaultValue; + } + + /** + * Create a new {@code StateDescriptor} with the given name and the given type information. + * + *

If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor. + * + * @param name The name of the {@code StateDescriptor}. + * @param type The class of the type of values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + protected StateDescriptor(String name, Class type, T defaultValue) { + this.name = requireNonNull(name, "name must not be null"); + requireNonNull(type, "type class must not be null"); + + try { + this.typeInfo = TypeExtractor.createTypeInfo(type); + } catch (Exception e) { + throw new RuntimeException("Cannot create full type information based on the given class. If the type has generics, please", e); + } + + this.defaultValue = defaultValue; + } + + // ------------------------------------------------------------------------ + + /** + * Returns the name of this {@code StateDescriptor}. + */ + public String getName() { + return name; + } + + /** + * Returns the default value. + */ + public T getDefaultValue() { + if (defaultValue != null) { + if (serializer != null) { + return serializer.copy(defaultValue); + } else { + throw new IllegalStateException("Serializer not yet initialized."); + } + } else { + return null; + } + } + + /** + * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. + * Note that the serializer may initialized lazily and is only guaranteed to exist after + * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}. + */ + public TypeSerializer getSerializer() { + if (serializer != null) { + return serializer; + } else { + throw new IllegalStateException("Serializer not yet initialized."); + } + } + + /** + * Sets the name for queries of state created from this descriptor. + * + *

If a name is set, the created state will be published for queries + * during runtime. The name needs to be unique per job. If there is another + * state instance published under the same name, the job will fail during runtime. + * + * @param queryableStateName State name for queries (unique name per job) + * @throws IllegalStateException If queryable state name already set + */ + public void setQueryable(String queryableStateName) { + if (this.queryableStateName == null) { + this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name"); + } else { + throw new IllegalStateException("Queryable state name already set"); + } + } + + /** + * Returns the queryable state name. + * + * @return Queryable state name or null if not set. + */ + public String getQueryableStateName() { + return queryableStateName; + } + + /** + * Returns whether the state created from this descriptor is queryable. + * + * @return true if state is queryable, false + * otherwise. + */ + public boolean isQueryable() { + return queryableStateName != null; + } + + /** + * Creates a new {@link State} on the given {@link StateBackend}. + * + * @param stateBackend The {@code StateBackend} on which to create the {@link State}. + */ + public abstract S bind(StateBackend stateBackend) throws Exception; + + // ------------------------------------------------------------------------ + + /** + * Checks whether the serializer has been initialized. Serializer initialization is lazy, + * to allow parametrization of serializers with an {@link ExecutionConfig} via + * {@link #initializeSerializerUnlessSet(ExecutionConfig)}. + * + * @return True if the serializers have been initialized, false otherwise. + */ + public boolean isSerializerInitialized() { + return serializer != null; + } + + /** + * Initializes the serializer, unless it has been initialized before. + * + * @param executionConfig The execution config to use when creating the serializer. + */ + public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { + if (serializer == null) { + if (typeInfo != null) { + serializer = typeInfo.createSerializer(executionConfig); + } else { + throw new IllegalStateException( + "Cannot initialize serializer after TypeInformation was dropped during serialization"); + } + } + } + + /** + * This method should be called by subclasses prior to serialization. Because the TypeInformation is + * not always serializable, it is 'transient' and dropped during serialization. Hence, the descriptor + * needs to make sure that the serializer is created before the TypeInformation is dropped. + */ + private void ensureSerializerCreated() { + if (serializer == null) { + if (typeInfo != null) { + serializer = typeInfo.createSerializer(new ExecutionConfig()); + } else { + throw new IllegalStateException( + "Cannot initialize serializer after TypeInformation was dropped during serialization"); + } + } + } + + // ------------------------------------------------------------------------ + // Standard Utils + // ------------------------------------------------------------------------ + + @Override + public abstract int hashCode(); + + @Override + public abstract boolean equals(Object o); + + @Override + public String toString() { + return getClass().getSimpleName() + + "{name=" + name + + ", defaultValue=" + defaultValue + + ", serializer=" + serializer + + (isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") + + '}'; + } + + public abstract org.apache.flink.api.common.state.StateDescriptor.Type getType(); + + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + private void writeObject(final ObjectOutputStream out) throws IOException { + // make sure we have a serializer before the type information gets lost + ensureSerializerCreated(); + + // write all the non-transient fields + out.defaultWriteObject(); + + // write the non-serializable default value field + if (defaultValue == null) { + // we don't have a default value + out.writeBoolean(false); + } else { + // we have a default value + out.writeBoolean(true); + + byte[] serializedDefaultValue; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) + { + TypeSerializer duplicateSerializer = serializer.duplicate(); + duplicateSerializer.serialize(defaultValue, outView); + + outView.flush(); + serializedDefaultValue = baos.toByteArray(); + } + catch (Exception e) { + throw new IOException("Unable to serialize default value of type " + + defaultValue.getClass().getSimpleName() + ".", e); + } + + out.writeInt(serializedDefaultValue.length); + out.write(serializedDefaultValue); + } + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + // read the non-transient fields + in.defaultReadObject(); + + // read the default value field + boolean hasDefaultValue = in.readBoolean(); + if (hasDefaultValue) { + int size = in.readInt(); + + byte[] buffer = new byte[size]; + + in.readFully(buffer); + + try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) + { + defaultValue = serializer.deserialize(inView); + } + catch (Exception e) { + throw new IOException("Unable to deserialize default value.", e); + } + } else { + defaultValue = null; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ValueStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ValueStateDescriptor.java new file mode 100644 index 0000000000000..e4c4b537d0023 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/migration/api/common/state/ValueStateDescriptor.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.migration.api.common.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Migration class present only to support resuming old savepoints that contained java-serialized data. + */ +@Internal +@Deprecated +public class ValueStateDescriptor extends StateDescriptor, T> { + private static final long serialVersionUID = 1L; + + /** + * Creates a new {@code ValueStateDescriptor} with the given name, type, and default value. + * + *

If this constructor fails (because it is not possible to describe the type via a class), + * consider using the {@link #ValueStateDescriptor(String, TypeInformation, Object)} constructor. + * + * @param name The (unique) name for the state. + * @param typeClass The type of the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + public ValueStateDescriptor(String name, Class typeClass, T defaultValue) { + super(name, typeClass, defaultValue); + } + + /** + * Creates a new {@code ValueStateDescriptor} with the given name and default value. + * + * @param name The (unique) name for the state. + * @param typeInfo The type of the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + public ValueStateDescriptor(String name, TypeInformation typeInfo, T defaultValue) { + super(name, typeInfo, defaultValue); + } + + /** + * Creates a new {@code ValueStateDescriptor} with the given name, default value, and the specific + * serializer. + * + * @param name The (unique) name for the state. + * @param typeSerializer The type serializer of the values in the state. + * @param defaultValue The default value that will be set when requesting state without setting + * a value before. + */ + public ValueStateDescriptor(String name, TypeSerializer typeSerializer, T defaultValue) { + super(name, typeSerializer, defaultValue); + } + + // ------------------------------------------------------------------------ + + @Override + public ValueState bind(StateBackend stateBackend) throws Exception { + return stateBackend.createValueState(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ValueStateDescriptor that = (ValueStateDescriptor) o; + + return serializer.equals(that.serializer) && name.equals(that.name); + + } + + @Override + public int hashCode() { + int result = serializer.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ValueStateDescriptor{" + + "name=" + name + + ", defaultValue=" + defaultValue + + ", serializer=" + serializer + + '}'; + } + + @Override + public org.apache.flink.api.common.state.StateDescriptor.Type getType() { + return org.apache.flink.api.common.state.StateDescriptor.Type.VALUE; + } +} + diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java index 9936ca798707b..1fc978e7e9835 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/KvStateSnapshot.java @@ -19,7 +19,7 @@ package org.apache.flink.migration.runtime.state; import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.migration.api.common.state.StateDescriptor; @Deprecated public interface KvStateSnapshot> diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java index 59c373b597b07..e71980ad9e380 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/AbstractFsStateSnapshot.java @@ -19,9 +19,9 @@ package org.apache.flink.migration.runtime.state.filesystem; import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.api.common.state.StateDescriptor; import org.apache.flink.migration.runtime.state.KvStateSnapshot; import java.io.IOException; diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java index e1bac839d465b..d9b25e1fb2b42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsFoldingState.java @@ -19,9 +19,9 @@ package org.apache.flink.migration.runtime.state.filesystem; import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.api.common.state.FoldingStateDescriptor; @Deprecated public class FsFoldingState { diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java index d4e3d4b7bac09..31b7b4eff4a24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsListState.java @@ -19,9 +19,9 @@ package org.apache.flink.migration.runtime.state.filesystem; import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.api.common.state.ListStateDescriptor; import java.util.ArrayList; diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java index 5cd950537f9cf..b292b4737a701 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsReducingState.java @@ -19,9 +19,9 @@ package org.apache.flink.migration.runtime.state.filesystem; import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.api.common.state.ReducingStateDescriptor; @Deprecated public class FsReducingState { diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java index 3b432a3ef9827..84875a409259c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/filesystem/FsValueState.java @@ -19,9 +19,9 @@ package org.apache.flink.migration.runtime.state.filesystem; import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.api.common.state.ValueStateDescriptor; @Deprecated public class FsValueState { diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java index 3336556f2f389..f8ec6158a21da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/AbstractMemStateSnapshot.java @@ -19,8 +19,8 @@ package org.apache.flink.migration.runtime.state.memory; import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.migration.api.common.state.StateDescriptor; import org.apache.flink.migration.runtime.state.KvStateSnapshot; import org.apache.flink.runtime.util.DataInputDeserializer; diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java index d6c63c4867255..64037f18b128e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemFoldingState.java @@ -19,8 +19,8 @@ package org.apache.flink.migration.runtime.state.memory; import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.migration.api.common.state.FoldingStateDescriptor; @Deprecated public class MemFoldingState { diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java index 416a898a5972c..1363e189510a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemListState.java @@ -19,8 +19,8 @@ package org.apache.flink.migration.runtime.state.memory; import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.migration.api.common.state.ListStateDescriptor; import java.util.ArrayList; diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java index 52d60a977d52c..c53b4494af8fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemReducingState.java @@ -19,8 +19,8 @@ package org.apache.flink.migration.runtime.state.memory; import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.migration.api.common.state.ReducingStateDescriptor; /** * Heap-backed partitioned {@link ReducingState} that is diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java index ff9bed80af7cb..aee2885254e70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/memory/MemValueState.java @@ -19,8 +19,8 @@ package org.apache.flink.migration.runtime.state.memory; import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.migration.api.common.state.ValueStateDescriptor; /** * Heap-backed key/value state that is snapshotted into a serialized memory copy. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index c8e0d0df4d74e..2a77aeff6ecec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -242,7 +242,7 @@ public KeyGroupRange getKeyGroupRange() { @Override public S getOrCreateKeyedState( final TypeSerializer namespaceSerializer, - StateDescriptor stateDescriptor) throws Exception { + StateDescriptor stateDescriptor) throws Exception { checkNotNull(namespaceSerializer, "Namespace serializer"); @@ -323,7 +323,7 @@ public FoldingState createFoldingState(FoldingStateDescriptor S getPartitionedState( final N namespace, final TypeSerializer namespaceSerializer, - final StateDescriptor stateDescriptor) throws Exception { + final StateDescriptor stateDescriptor) throws Exception { checkNotNull(namespace, "Namespace"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java index d8b8aa8f5cee9..34885047e5411 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java @@ -93,7 +93,7 @@ public FoldingState getFoldingState(FoldingStateDescriptor S getPartitionedState(StateDescriptor stateDescriptor) throws Exception { + private S getPartitionedState(StateDescriptor stateDescriptor) throws Exception { return keyedStateBackend.getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java index 15e0491f99c2f..1fc6fc86c9806 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java @@ -75,7 +75,7 @@ public interface KeyedStateBackend { */ S getOrCreateKeyedState( TypeSerializer namespaceSerializer, - StateDescriptor stateDescriptor) throws Exception; + StateDescriptor stateDescriptor) throws Exception; /** * Creates or retrieves a partitioned state backed by this state backend. @@ -96,7 +96,7 @@ S getOrCreateKeyedState( S getPartitionedState( N namespace, TypeSerializer namespaceSerializer, - StateDescriptor stateDescriptor) throws Exception; + StateDescriptor stateDescriptor) throws Exception; /** * Closes the backend and releases all resources. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java index 4ac712535158c..b7fbdeb1fe931 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapMergingState.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.SimpleStateDescriptor; import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalMergingState; @@ -40,7 +40,7 @@ * @param The type of State * @param The type of StateDescriptor for the State S */ -public abstract class AbstractHeapMergingState> +public abstract class AbstractHeapMergingState> extends AbstractHeapState implements InternalMergingState { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java index 18b71de57cc2c..34b04fee29335 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.SimpleStateDescriptor; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -43,7 +44,7 @@ * @param The type of State * @param The type of StateDescriptor for the State S */ -public abstract class AbstractHeapState> +public abstract class AbstractHeapState> implements InternalKvState { /** Map containing the actual key/value pairs */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java index 6df3f5df01959..70b8738e29613 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapFoldingState.java @@ -119,7 +119,7 @@ public void add(T value) throws IOException { if (currentValue == null) { keyedMap.put(backend.getCurrentKey(), - foldFunction.fold(stateDesc.getDefaultValue(), value)); + foldFunction.fold(stateDesc.getInitialValue(), value)); } else { keyedMap.put(backend.getCurrentKey(), foldFunction.fold(currentValue, value)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 0fe92e76dfbc5..8601ea0efca78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.SimpleStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -111,7 +112,7 @@ public HeapKeyedStateBackend( @SuppressWarnings("unchecked") private StateTable tryRegisterStateTable( - TypeSerializer namespaceSerializer, StateDescriptor stateDesc) { + TypeSerializer namespaceSerializer, SimpleStateDescriptor stateDesc) { String name = stateDesc.getName(); StateTable stateTable = (StateTable) stateTables.get(name); @@ -158,7 +159,8 @@ public InternalListState createListState( StateTable> stateTable = (StateTable>) stateTables.get(name); RegisteredBackendStateMetaInfo> newMetaInfo = - new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer())); + new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, + new ArrayListSerializer<>(stateDesc.getSerializer())); stateTable = tryRegisterStateTable(stateTable, newMetaInfo); return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java index 512768d84cd08..f09ce3c4c6fa2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.checkpoint.savepoint; -import org.apache.flink.api.common.state.ValueStateDescriptor; + import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.migration.api.common.state.ValueStateDescriptor; import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0; import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer; import org.apache.flink.migration.runtime.state.KvStateSnapshot; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java index 7522a617bf792..134ba75625d35 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java @@ -37,7 +37,7 @@ abstract class AbstractQueryableStateOperator implements OneInputStreamOperator { /** State descriptor for the queryable state instance. */ - protected final StateDescriptor stateDescriptor; + protected final StateDescriptor stateDescriptor; /** * Name under which the queryable state is registered. @@ -53,7 +53,7 @@ abstract class AbstractQueryableStateOperator public AbstractQueryableStateOperator( String registrationName, - StateDescriptor stateDescriptor) { + StateDescriptor stateDescriptor) { this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name"); this.stateDescriptor = Preconditions.checkNotNull(stateDescriptor, "State descriptor"); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableAppendingStateOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableAppendingStateOperator.java index 7ac14ed1d3b42..e47b05d23df55 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableAppendingStateOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableAppendingStateOperator.java @@ -33,7 +33,7 @@ public class QueryableAppendingStateOperator extends AbstractQueryableStateO public QueryableAppendingStateOperator( String registrationName, - StateDescriptor, ?> stateDescriptor) { + StateDescriptor> stateDescriptor) { super(registrationName, stateDescriptor); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableValueStateOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableValueStateOperator.java index 49605a923db31..9e606cfb2c7fb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableValueStateOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/QueryableValueStateOperator.java @@ -33,7 +33,7 @@ public class QueryableValueStateOperator extends AbstractQueryableStateOpera public QueryableValueStateOperator( String registrationName, - StateDescriptor, IN> stateDescriptor) { + StateDescriptor> stateDescriptor) { super(registrationName, stateDescriptor); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 05f2ed555d8d6..2d4832db01c5a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -538,14 +538,14 @@ protected ProcessingTimeService getProcessingTimeService() { * @throws IllegalStateException Thrown, if the key/value state was already initialized. * @throws Exception Thrown, if the state backend cannot create the key/value state. */ - protected S getPartitionedState(StateDescriptor stateDescriptor) throws Exception { + protected S getPartitionedState(StateDescriptor stateDescriptor) throws Exception { return getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); } protected S getOrCreateKeyedState( TypeSerializer namespaceSerializer, - StateDescriptor stateDescriptor) throws Exception { + StateDescriptor stateDescriptor) throws Exception { if (keyedStateStore != null) { return keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor); @@ -570,7 +570,7 @@ protected S getOrCreateKeyedState( protected S getPartitionedState( N namespace, TypeSerializer namespaceSerializer, - StateDescriptor stateDescriptor) throws Exception { + StateDescriptor stateDescriptor) throws Exception { if (keyedStateStore != null) { return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index b9c9b9befc80e..872ff13d9de44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -137,7 +137,7 @@ public FoldingState getFoldingState(FoldingStateDescriptor stateDescriptor) { + private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor stateDescriptor) { Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null"); KeyedStateStore keyedStateStore = operator.getKeyedStateStore(); Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation."); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index 11a0d6dcdac0a..d622afd84eb5a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -184,7 +184,7 @@ public interface TriggerContext { * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part os a KeyedStream). */ - S getPartitionedState(StateDescriptor stateDescriptor); + S getPartitionedState(StateDescriptor stateDescriptor); /** * Retrieves a {@link ValueState} object that can be used to interact with @@ -231,6 +231,6 @@ public interface TriggerContext { * {@link Trigger#onMerge(Window, OnMergeContext)}. */ public interface OnMergeContext extends TriggerContext { - > void mergePartitionedState(StateDescriptor stateDescriptor); + > void mergePartitionedState(StateDescriptor stateDescriptor); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 45fea14a2237b..5e1fc4123c036 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -67,7 +67,7 @@ public class EvictingWindowOperator private final Evictor evictor; - private final StateDescriptor>, ?> evictingWindowStateDescriptor; + private final StateDescriptor>> evictingWindowStateDescriptor; // ------------------------------------------------------------------------ // the fields below are instantiated once the operator runs in the runtime @@ -82,7 +82,7 @@ public EvictingWindowOperator(WindowAssigner windowAssigner, TypeSerializer windowSerializer, KeySelector keySelector, TypeSerializer keySerializer, - StateDescriptor>, ?> windowStateDescriptor, + StateDescriptor>> windowStateDescriptor, InternalWindowFunction, OUT, K, W> windowFunction, Trigger trigger, Evictor evictor, @@ -426,7 +426,7 @@ public void dispose() throws Exception{ @Override @VisibleForTesting @SuppressWarnings("unchecked, rawtypes") - public StateDescriptor>, ?> getStateDescriptor() { - return (StateDescriptor>, ?>) evictingWindowStateDescriptor; + public StateDescriptor>> getStateDescriptor() { + return (StateDescriptor>>) evictingWindowStateDescriptor; } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 3c4f39776ec74..0d446162d4603 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.AppendingState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.SimpleStateDescriptor; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -114,7 +115,7 @@ public class WindowOperator private final Trigger trigger; - private final StateDescriptor, ?> windowStateDescriptor; + private final StateDescriptor> windowStateDescriptor; /** For serializing the key in checkpoints. */ protected final TypeSerializer keySerializer; @@ -197,7 +198,7 @@ public WindowOperator( TypeSerializer windowSerializer, KeySelector keySelector, TypeSerializer keySerializer, - StateDescriptor, ?> windowStateDescriptor, + StateDescriptor> windowStateDescriptor, InternalWindowFunction windowFunction, Trigger trigger, long allowedLateness) { @@ -214,7 +215,7 @@ public WindowOperator( TypeSerializer windowSerializer, KeySelector keySelector, TypeSerializer keySerializer, - StateDescriptor, ?> windowStateDescriptor, + StateDescriptor> windowStateDescriptor, InternalWindowFunction windowFunction, Trigger trigger, long allowedLateness, @@ -685,7 +686,7 @@ public ValueState getKeyValueState(String name, } @SuppressWarnings("unchecked") - public S getPartitionedState(StateDescriptor stateDescriptor) { + public S getPartitionedState(StateDescriptor stateDescriptor) { try { return WindowOperator.this.getPartitionedState(window, windowSerializer, stateDescriptor); } catch (Exception e) { @@ -694,7 +695,7 @@ public S getPartitionedState(StateDescriptor stateDescri } @Override - public > void mergePartitionedState(StateDescriptor stateDescriptor) { + public > void mergePartitionedState(StateDescriptor stateDescriptor) { if (mergedWindows != null && mergedWindows.size() > 0) { try { S rawState = getKeyedStateBackend().getOrCreateKeyedState(windowSerializer, stateDescriptor); @@ -906,7 +907,7 @@ private void restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInpu long nextElementTimestamp = nextSlideTime - (numPanes * paneSize); @SuppressWarnings("unchecked") - ArrayListSerializer ser = new ArrayListSerializer<>((TypeSerializer) getStateDescriptor().getSerializer()); + ArrayListSerializer ser = new ArrayListSerializer<>(getStateSerializer(getStateDescriptor())); while (numPanes > 0) { validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, in.readInt()); @@ -942,7 +943,7 @@ private void restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInput K key = keySerializer.deserialize(in); @SuppressWarnings("unchecked") - IN value = (IN) getStateDescriptor().getSerializer().deserialize(in); + IN value = (IN) getStateSerializer(getStateDescriptor()).deserialize(in); restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(value, nextElementTimestamp)); } numPanes--; @@ -1074,6 +1075,16 @@ public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception { restoredFromLegacyAlignedOpRecords = null; } + @SuppressWarnings("unchecked") + private TypeSerializer getStateSerializer(StateDescriptor stateDescriptor) { + if (stateDescriptor instanceof SimpleStateDescriptor) { + SimpleStateDescriptor simpleStateDescriptor = (SimpleStateDescriptor) stateDescriptor; + return simpleStateDescriptor.getSerializer(); + } else { + throw new IllegalStateException(); + } + } + // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ @@ -1094,7 +1105,7 @@ public WindowAssigner getWindowAssigner() { } @VisibleForTesting - public StateDescriptor, ?> getStateDescriptor() { + public StateDescriptor> getStateDescriptor() { return windowStateDescriptor; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java index c0ca6a00367d7..293b0dbc698a0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.SimpleStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -200,10 +201,10 @@ public void apply(TimeWindow window, Iterable input, Collector out private void validateStateDescriptorConfigured(SingleOutputStreamOperator result) { OneInputTransformation transform = (OneInputTransformation) result.getTransformation(); WindowOperator op = (WindowOperator) transform.getOperator(); - StateDescriptor descr = op.getStateDescriptor(); + StateDescriptor descr = op.getStateDescriptor(); // this would be the first statement to fail if state descriptors were not properly initialized - TypeSerializer serializer = descr.getSerializer(); + TypeSerializer serializer = ((SimpleStateDescriptor) descr).getSerializer(); assertTrue(serializer instanceof KryoSerializer); Kryo kryo = ((KryoSerializer) serializer).getKryo(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 2791726cecc3f..060c0d83c52da 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -63,15 +63,15 @@ import static org.mockito.Mockito.when; public class StreamingRuntimeContextTest { - + @Test public void testValueStateInstantiation() throws Exception { - + final ExecutionConfig config = new ExecutionConfig(); config.registerKryoType(Path.class); - + final AtomicReference descriptorCapture = new AtomicReference<>(); - + StreamingRuntimeContext context = new StreamingRuntimeContext( createDescriptorCapturingMockOp(descriptorCapture, config), createMockEnvironment(), @@ -79,10 +79,12 @@ public void testValueStateInstantiation() throws Exception { ValueStateDescriptor descr = new ValueStateDescriptor<>("name", TaskInfo.class); context.getState(descr); - - StateDescriptor descrIntercepted = (StateDescriptor) descriptorCapture.get(); - TypeSerializer serializer = descrIntercepted.getSerializer(); - + + StateDescriptor descrIntercepted = (StateDescriptor) descriptorCapture.get(); + assertTrue(descrIntercepted instanceof ValueStateDescriptor); + + TypeSerializer serializer = ((ValueStateDescriptor)descrIntercepted).getSerializer(); + // check that the Path class is really registered, i.e., the execution config was applied assertTrue(serializer instanceof KryoSerializer); assertTrue(((KryoSerializer) serializer).getKryo().getRegistration(Path.class).getId() > 0); @@ -103,14 +105,16 @@ public void testReducingStateInstantiation() throws Exception { @SuppressWarnings("unchecked") ReduceFunction reducer = (ReduceFunction) mock(ReduceFunction.class); - - ReducingStateDescriptor descr = + + ReducingStateDescriptor descr = new ReducingStateDescriptor<>("name", reducer, TaskInfo.class); - + context.getReducingState(descr); - StateDescriptor descrIntercepted = (StateDescriptor) descriptorCapture.get(); - TypeSerializer serializer = descrIntercepted.getSerializer(); + StateDescriptor descrIntercepted = (StateDescriptor) descriptorCapture.get(); + assertTrue(descrIntercepted instanceof ReducingStateDescriptor); + + TypeSerializer serializer = ((ReducingStateDescriptor)descrIntercepted).getSerializer(); // check that the Path class is really registered, i.e., the execution config was applied assertTrue(serializer instanceof KryoSerializer); @@ -162,8 +166,10 @@ public void testListStateInstantiation() throws Exception { ListStateDescriptor descr = new ListStateDescriptor<>("name", TaskInfo.class); context.getListState(descr); - StateDescriptor descrIntercepted = (StateDescriptor) descriptorCapture.get(); - TypeSerializer serializer = descrIntercepted.getSerializer(); + StateDescriptor descrIntercepted = (StateDescriptor) descriptorCapture.get(); + assertTrue(descrIntercepted instanceof ListStateDescriptor); + + TypeSerializer serializer = ((ListStateDescriptor)descrIntercepted).getSerializer(); // check that the Path class is really registered, i.e., the execution config was applied assertTrue(serializer instanceof KryoSerializer); @@ -185,15 +191,15 @@ public void testListStateReturnsEmptyListByDefault() throws Exception { assertNotNull(value); assertFalse(value.iterator().hasNext()); } - + // ------------------------------------------------------------------------ // // ------------------------------------------------------------------------ - + @SuppressWarnings("unchecked") private static AbstractStreamOperator createDescriptorCapturingMockOp( final AtomicReference ref, final ExecutionConfig config) throws Exception { - + AbstractStreamOperator operatorMock = mock(AbstractStreamOperator.class); KeyedStateBackend keyedStateBackend= mock(KeyedStateBackend.class); @@ -251,7 +257,7 @@ public ListState answer(InvocationOnMock invocationOnMock) throws Throwa when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore); return operatorMock; } - + private static Environment createMockEnvironment() { Environment env = mock(Environment.class); when(env.getUserClassLoader()).thenReturn(StreamingRuntimeContextTest.class.getClassLoader()); @@ -259,4 +265,4 @@ private static Environment createMockEnvironment() { when(env.getTaskInfo()).thenReturn(new TaskInfo("test task", 1, 0, 1, 1)); return env; } -} +} \ No newline at end of file diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java index b9923f2bb3ba9..3b3d305ed0443 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java @@ -321,7 +321,7 @@ public void deleteEventTimeTimer(long time) { } @Override - public S getPartitionedState(StateDescriptor stateDescriptor) { + public S getPartitionedState(StateDescriptor stateDescriptor) { try { return stateBackend.getPartitionedState(window, windowSerializer, stateDescriptor); } catch (Exception e) { @@ -359,7 +359,7 @@ public TestOnMergeContext( } @Override - public > void mergePartitionedState(StateDescriptor stateDescriptor) { + public > void mergePartitionedState(StateDescriptor stateDescriptor) { try { S rawState = stateBackend.getOrCreateKeyedState(windowSerializer, stateDescriptor); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index 7c4d71171f808..1a11941b20a3a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -2301,7 +2301,7 @@ private KeyedOneInputStreamOperatorTestHarness assigner, Trigger trigger, long allowedLatenss, - StateDescriptor, ?> stateDescriptor, + StateDescriptor> stateDescriptor, InternalWindowFunction windowFunction) throws Exception { KeySelector keySelector = new KeySelector() {