Skip to content

Commit

Permalink
[FLINK-3201] Add operator state to make change backwards compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 3, 2016
1 parent 524e56b commit 6f75596
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 61 deletions.
Expand Up @@ -32,9 +32,13 @@


import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.derby.drda.NetworkServerControl; import org.apache.derby.drda.NetworkServerControl;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend;
Expand All @@ -46,6 +50,7 @@
import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.IdentityKeySelector; import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.IdentityKeySelector;
import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.NonSerializableLong; import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.NonSerializableLong;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;

import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;


Expand Down Expand Up @@ -182,7 +187,7 @@ public void restoreState(Integer state) {


private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> { private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> {


private static Map<Integer, Long> allSums = new ConcurrentHashMap<Integer, Long>(); private static Map<Integer, Long> allSums = new ConcurrentHashMap<>();


private static volatile boolean hasFailed = false; private static volatile boolean hasFailed = false;


Expand All @@ -191,7 +196,7 @@ private static class OnceFailingPartitionedSum extends RichMapFunction<Integer,
private long failurePos; private long failurePos;
private long count; private long count;


private OperatorState<Long> sum; private ValueState<Long> sum;


OnceFailingPartitionedSum(long numElements) { OnceFailingPartitionedSum(long numElements) {
this.numElements = numElements; this.numElements = numElements;
Expand All @@ -204,7 +209,8 @@ public void open(Configuration parameters) throws IOException {


failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
count = 0; count = 0;
sum = getRuntimeContext().getKeyValueState("my_state", Long.class, 0L); sum = getRuntimeContext().getPartitionedState(
new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE));
} }


@Override @Override
Expand All @@ -224,15 +230,19 @@ public Tuple2<Integer, Long> map(Integer value) throws Exception {


private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> { private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> {


private static Map<Integer, Long> allCounts = new ConcurrentHashMap<Integer, Long>(); private static Map<Integer, Long> allCounts = new ConcurrentHashMap<>();


private OperatorState<NonSerializableLong> aCounts; private ValueState<NonSerializableLong> aCounts;
private OperatorState<Long> bCounts; private ValueState<Long> bCounts;


@Override @Override
public void open(Configuration parameters) throws IOException { public void open(Configuration parameters) throws IOException {
aCounts = getRuntimeContext().getKeyValueState("a", NonSerializableLong.class, NonSerializableLong.of(0L)); aCounts = getRuntimeContext().getPartitionedState(
bCounts = getRuntimeContext().getKeyValueState("b", Long.class, 0L); new ValueStateDescriptor<>("a", NonSerializableLong.of(0L),
new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig())));

bCounts = getRuntimeContext().getPartitionedState(
new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE));
} }


@Override @Override
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateDescriptor;
Expand Down Expand Up @@ -145,6 +146,7 @@ public interface RuntimeContext {
/** /**
* Convenience function to create a counter object for histograms. * Convenience function to create a counter object for histograms.
*/ */
@Experimental
Histogram getHistogram(String name); Histogram getHistogram(String name);


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -208,13 +210,11 @@ public interface RuntimeContext {
* *
* keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() { * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() {
* *
* private ValueStateDescriptor<Long> countIdentifier =
* new ValueStateDescriptor<>("count", 0L, LongSerializer.INSTANCE);
*
* private ValueState<Long> count; * private ValueState<Long> count;
* *
* public void open(Configuration cfg) { * public void open(Configuration cfg) {
* state = getRuntimeContext().getPartitionedState(countIdentifier); * state = getRuntimeContext().getPartitionedState(
* new ValueStateDescriptor<Long>("count", 0L, LongSerializer.INSTANCE));
* } * }
* *
* public Tuple2<MyType, Long> map(MyType value) { * public Tuple2<MyType, Long> map(MyType value) {
Expand Down Expand Up @@ -291,9 +291,11 @@ public interface RuntimeContext {
* *
* @throws UnsupportedOperationException Thrown, if no key/value state is available for the * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
* function (function is not part os a KeyedStream). * function (function is not part os a KeyedStream).
*
* @deprecated Use the more expressive {@link #getPartitionedState(StateDescriptor)} instead.
*/ */
@Deprecated @Deprecated
<S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);


/** /**
* Gets the key/value state, which is only accessible if the function is executed on * Gets the key/value state, which is only accessible if the function is executed on
Expand Down Expand Up @@ -330,7 +332,6 @@ public interface RuntimeContext {
* *
* }</pre> * }</pre>
* *
*
* @param name The name of the key/value state. * @param name The name of the key/value state.
* @param stateType The type information for the type that is stored in the state. * @param stateType The type information for the type that is stored in the state.
* Used to create serializers for managed memory and checkpoints. * Used to create serializers for managed memory and checkpoints.
Expand All @@ -342,7 +343,9 @@ public interface RuntimeContext {
* *
* @throws UnsupportedOperationException Thrown, if no key/value state is available for the * @throws UnsupportedOperationException Thrown, if no key/value state is available for the
* function (function is not part os a KeyedStream). * function (function is not part os a KeyedStream).
*
* @deprecated Use the more expressive {@link #getPartitionedState(StateDescriptor)} instead.
*/ */
@Deprecated @Deprecated
<S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
} }
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateDescriptor;
Expand Down Expand Up @@ -179,14 +180,14 @@ public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescripto


@Override @Override
@Deprecated @Deprecated
public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream"); "This state is only accessible by functions executed on a KeyedStream");
} }


@Override @Override
@Deprecated @Deprecated
public <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream"); "This state is only accessible by functions executed on a KeyedStream");
} }
Expand Down
Expand Up @@ -18,35 +18,34 @@


package org.apache.flink.api.common.state; package org.apache.flink.api.common.state;


import org.apache.flink.annotation.Public;

import java.io.IOException; import java.io.IOException;


/** /**
* This state interface abstracts persistent key/value state in streaming programs. * This state interface abstracts persistent key/value state in streaming programs.
* The state is accessed and modified by user functions, and checkpointed consistently * The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots. * by the system as part of the distributed snapshots.
* *
* <p>The state is only accessible by functions applied on a KeyedDataStream. The key is * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
* automatically supplied by the system, so the function always sees the value mapped to the * automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning * key of the current element. That way, the system can handle stream and state partitioning
* consistently together. * consistently together.
* *
* @param <T> Type of the value in the operator state * @param <T> Type of the value in the operator state
*
* @deprecated OperatorState has been replaced by {@link ValueState}.
*/ */
@Public
@Deprecated @Deprecated
public interface OperatorState<T> { public interface OperatorState<T> extends State {


/** /**
* Returns the current value for the state. When the state is not * Returns the current value for the state. When the state is not
* partitioned the returned value is the same for all inputs in a given * partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned * operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an * depends on the current operator input, as the operator maintains an
* independent state for each partition. * independent state for each partition.
* *
* @return The operator state value corresponding to the current input. * @return The operator state value corresponding to the current input.
* *
* @throws IOException Thrown if the system cannot access the state. * @throws IOException Thrown if the system cannot access the state.
*/ */
T value() throws IOException; T value() throws IOException;
Expand All @@ -57,12 +56,12 @@ public interface OperatorState<T> {
* partition) the returned state will represent the updated value. When a * partition) the returned state will represent the updated value. When a
* partitioned state is updated with null, the state for the current key * partitioned state is updated with null, the state for the current key
* will be removed and the default value is returned on the next access. * will be removed and the default value is returned on the next access.
* *
* @param value * @param value
* The new value for the state. * The new value for the state.
* *
* @throws IOException Thrown if the system cannot access the state. * @throws IOException Thrown if the system cannot access the state.
*/ */
void update(T value) throws IOException; void update(T value) throws IOException;

} }
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.apache.flink.api.common.state; package org.apache.flink.api.common.state;


/** /**
Expand All @@ -26,5 +27,9 @@
* consistently together. * consistently together.
*/ */
public interface State { public interface State {

/**
* Removes the value mapped under the current key.
*/
void clear(); void clear();
} }
Expand Up @@ -41,7 +41,7 @@ public abstract class StateDescriptor<S extends State> implements Serializable {
* @param name The name of the {@code StateDescriptor}. * @param name The name of the {@code StateDescriptor}.
*/ */
public StateDescriptor(String name) { public StateDescriptor(String name) {
this.name = requireNonNull(name);; this.name = requireNonNull(name);
} }


/** /**
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueState;
Expand Down Expand Up @@ -148,7 +149,7 @@ public DistributedCache getDistributedCache() {
} }


@Override @Override
public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext; import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor;
Expand Down Expand Up @@ -116,7 +116,7 @@ public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescripto


@Override @Override
@Deprecated @Deprecated
public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
requireNonNull(stateType, "The state type class must not be null"); requireNonNull(stateType, "The state type class must not be null");


TypeInformation<S> typeInfo; TypeInformation<S> typeInfo;
Expand All @@ -134,7 +134,7 @@ public <S> ValueState<S> getKeyValueState(String name, Class<S> stateType, S def


@Override @Override
@Deprecated @Deprecated
public <S> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
requireNonNull(name, "The name of the state must not be null"); requireNonNull(name, "The name of the state must not be null");
requireNonNull(stateType, "The state type information must not be null"); requireNonNull(stateType, "The state type information must not be null");


Expand Down
Expand Up @@ -494,7 +494,7 @@ public boolean triggerCheckpoint(final long checkpointId, final long timestamp)
// start a Thread that does the asynchronous materialization and // start a Thread that does the asynchronous materialization and
// then sends the checkpoint acknowledge // then sends the checkpoint acknowledge


String threadName = "Materialize checkpoint " + checkpointId + " for " + getName(); String threadName = "Materialize checkpoint state " + checkpointId + " - " + getName();
Thread checkpointThread = new Thread(threadName) { Thread checkpointThread = new Thread(threadName) {
@Override @Override
public void run() { public void run() {
Expand Down
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala


import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream} import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
Expand Down Expand Up @@ -298,10 +299,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]


val cleanFun = clean(fun) val cleanFun = clean(fun)
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(getExecutionConfig)


val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] { val filterFun = new RichFilterFunction[T] with StatefulFunction[T, Boolean, S] {


override val stateType: TypeInformation[S] = stateTypeInfo override val stateSerializer: TypeSerializer[S] = serializer


override def filter(in: T): Boolean = { override def filter(in: T): Boolean = {
applyWithState(in, cleanFun) applyWithState(in, cleanFun)
Expand All @@ -326,10 +328,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]


val cleanFun = clean(fun) val cleanFun = clean(fun)
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(getExecutionConfig)


val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] { val mapper = new RichMapFunction[T, R] with StatefulFunction[T, R, S] {


override val stateType: TypeInformation[S] = stateTypeInfo override val stateSerializer: TypeSerializer[S] = serializer


override def map(in: T): R = { override def map(in: T): R = {
applyWithState(in, cleanFun) applyWithState(in, cleanFun)
Expand All @@ -354,10 +357,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]


val cleanFun = clean(fun) val cleanFun = clean(fun)
val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]] val stateTypeInfo: TypeInformation[S] = implicitly[TypeInformation[S]]
val serializer: TypeSerializer[S] = stateTypeInfo.createSerializer(getExecutionConfig)


val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{ val flatMapper = new RichFlatMapFunction[T, R] with StatefulFunction[T,TraversableOnce[R],S]{


override val stateType: TypeInformation[S] = stateTypeInfo override val stateSerializer: TypeSerializer[S] = serializer


override def flatMap(in: T, out: Collector[R]): Unit = { override def flatMap(in: T, out: Collector[R]): Unit = {
applyWithState(in, cleanFun) foreach out.collect applyWithState(in, cleanFun) foreach out.collect
Expand Down
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.streaming.api.scala.function package org.apache.flink.streaming.api.scala.function


import org.apache.flink.api.common.functions.RichFunction import org.apache.flink.api.common.functions.RichFunction
import org.apache.flink.api.common.state.OperatorState import org.apache.flink.api.common.state.{ValueStateDescriptor, ValueState}
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.configuration.Configuration import org.apache.flink.configuration.Configuration


/** /**
Expand All @@ -29,9 +29,11 @@ import org.apache.flink.configuration.Configuration
* call the applyWithState method in his own RichFunction implementation. * call the applyWithState method in his own RichFunction implementation.
*/ */
trait StatefulFunction[I, O, S] extends RichFunction { trait StatefulFunction[I, O, S] extends RichFunction {

protected val stateSerializer: TypeSerializer[S]

private[this] var state: ValueState[S] = _


var state: OperatorState[S] = _
val stateType: TypeInformation[S]


def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = { def applyWithState(in: I, fun: (I, Option[S]) => (O, Option[S])): O = {
val (o, s: Option[S]) = fun(in, Option(state.value())) val (o, s: Option[S]) = fun(in, Option(state.value()))
Expand All @@ -43,6 +45,7 @@ trait StatefulFunction[I, O, S] extends RichFunction {
} }


override def open(c: Configuration) = { override def open(c: Configuration) = {
state = getRuntimeContext().getKeyValueState[S]("state", stateType, null.asInstanceOf[S]) val info = new ValueStateDescriptor[S]("state", null.asInstanceOf[S], stateSerializer)
state = getRuntimeContext().getPartitionedState[ValueState[S]](info)
} }
} }

0 comments on commit 6f75596

Please sign in to comment.