Skip to content

Commit

Permalink
[FLINK-5991] [DataStream] Expose Union ListState for operator state
Browse files Browse the repository at this point in the history
This commit exposes the union list state scheme for managed operator state.
The actual functionality was already previously added to the
`DefaultOperatorStateBackend`, so this change simply exposes the feature
through the `OperatorStateStore` interface.

This commit also updates the documentation for managed operator state so
that it covers the new union list state scheme. It strengthens the
difference between keyed and non-keyed state data structures by
emphasizing the semantic differences in the state access method Javadocs.

This closes #3508.
  • Loading branch information
tzulitai committed Apr 19, 2017
1 parent a1aab64 commit 2ef4900
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 61 deletions.
109 changes: 71 additions & 38 deletions docs/dev/stream/state.md
Expand Up @@ -64,16 +64,12 @@ for one or more Key Groups.


With *Operator State* (or *non-keyed state*), each operator state is With *Operator State* (or *non-keyed state*), each operator state is
bound to one parallel operator instance. bound to one parallel operator instance.
The Kafka source connector is a good motivating example for the use of Operator State The [Kafka Connector](../connectors/kafka.html) is a good motivating example for the use of Operator State
in Flink. Each parallel instance of this Kafka consumer maintains a map in Flink. Each parallel instance of the Kafka consumer maintains a map
of topic partitions and offsets as its Operator State. of topic partitions and offsets as its Operator State.


The Operator State interfaces support redistributing state among The Operator State interfaces support redistributing state among
parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution; the following are currently defined: parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution.

- **List-style redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators.
Each operator gets a sublist, which can be empty, or contain one or more elements.


## Raw and Managed State ## Raw and Managed State


Expand Down Expand Up @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream


## Using Managed Operator State ## Using Managed Operator State


A stateful function can implement either the more general `CheckpointedFunction` To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction`
interface, or the `ListCheckpointed<T extends Serializable>` interface. interface, or the `ListCheckpointed<T extends Serializable>` interface.


In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, #### CheckpointedFunction
thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
while `(test2, 2)` will go to task 1.

##### ListCheckpointed


The `ListCheckpointed` interface requires the implementation of two methods: The `CheckpointedFunction` interface provides access to non-keyed state with different

redistribution schemes. It requires the implementation of two methods:
{% highlight java %}
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;
{% endhighlight %}

On `snapshotState()` the operator should return a list of objects to checkpoint and
`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.

##### CheckpointedFunction

The `CheckpointedFunction` interface also requires the implementation of two methods:


{% highlight java %} {% highlight java %}
void snapshotState(FunctionSnapshotContext context) throws Exception; void snapshotState(FunctionSnapshotContext context) throws Exception;


void initializeState(FunctionInitializationContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception;
{% endhighlight %} {% endhighlight %}


Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`,
or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not is called every time the user-defined function is initialized, be that when the function is first initialized
or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
only the place where different types of state are initialized, but also where state recovery logic is included. only the place where different types of state are initialized, but also where state recovery logic is included.


This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that Currently, list-style managed operator state is supported. The state
uses state to buffer elements before sending them to the outside world: is expected to be a `List` of *serializable* objects, independent from each other,
thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which
non-keyed state can be redistributed. Depending on the state accessing method,
the following redistribution schemes are defined:

- **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators.
Each operator gets a sublist, which can be empty, or contain one or more elements.
As an example, if with parallelism 1 the checkpointed state of an operator
contains elements `element1` and `element2`, when increasing the parallelism to 2, `element1` may end up in operator instance 0,
while `element2` will go to operator instance 1.

- **Union redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of
all lists. On restore/redistribution, each operator gets the complete list of state elements.

Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction`
to buffer elements before sending them to the outside world. It demonstrates
the basic even-split redistribution list state:


{% highlight java %} {% highlight java %}
public class BufferingSink public class BufferingSink
Expand Down Expand Up @@ -311,8 +306,13 @@ public class BufferingSink


@Override @Override
public void initializeState(FunctionInitializationContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore(). ListStateDescriptor<Tuple2<String, Integer>> descriptor =
getSerializableListState("buffered-elements"); new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
Tuple2.of(0L, 0L));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);


if (context.isRestored()) { if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) { for (Tuple2<String, Integer> element : checkpointedState.get()) {
Expand All @@ -329,12 +329,29 @@ public class BufferingSink
} }
{% endhighlight %} {% endhighlight %}



The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize
the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects
are going to be stored upon checkpointing. are going to be stored upon checkpointing.


`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");` Note how the state is initialized, similar to keyed state,
with a `StateDescriptor` that contains the state name and information
about the type of the value that the state holds:

{% highlight java %}
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
Tuple2.of(0L, 0L));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);
{% endhighlight %}

The naming convention of the state access methods contain its redistribution
pattern followed by its state structure. For example, to use list state with the
union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`.
If the method name does not contain the redistribution pattern, *e.g.* `getListState(descriptor)`,
it simply implies that the basic even-split redistribution scheme will be used.


After initializing the container, we use the `isRestored()` method of the context to check if we are After initializing the container, we use the `isRestored()` method of the context to check if we are
recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
Expand All @@ -346,6 +363,22 @@ of all objects included by the previous checkpoint, and is then filled with the
As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done
using the provided `FunctionInitializationContext`. using the provided `FunctionInitializationContext`.


#### ListCheckpointed

The `ListCheckpointed` interface is a more limited variant of `CheckpointedFunction`,
which only supports list-style state with even-split redistribution scheme on restore.
It also requires the implementation of two methods:

{% highlight java %}
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;
{% endhighlight %}

On `snapshotState()` the operator should return a list of objects to checkpoint and
`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.

### Stateful Source Functions ### Stateful Source Functions


Stateful sources require a bit more care as opposed to other operators. Stateful sources require a bit more care as opposed to other operators.
Expand Down
Expand Up @@ -33,7 +33,16 @@ public interface OperatorStateStore {
* Creates (or restores) a list state. Each state is registered under a unique name. * Creates (or restores) a list state. Each state is registered under a unique name.
* The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
* *
* The items in the list are repartitionable by the system in case of changed operator parallelism. * <p>Note the semantic differences between an operator list state and a keyed list state
* (see {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the context of operator state,
* the list is a collection of state items that are independent from each other and eligible for redistribution
* across operator instances in case of changed operator parallelism. In other words, these state items are
* the finest granularity at which non-keyed state can be redistributed, and should not be correlated with
* each other.
*
* <p>The redistribution scheme of this list state upon operator rescaling is a round-robin pattern, such that
* the logical whole state (a concatenation of all the lists of state elements previously managed by each operator
* before the restore) is evenly divided into as many sublists as there are parallel operators.
* *
* @param stateDescriptor The descriptor for this state, providing a name and serializer. * @param stateDescriptor The descriptor for this state, providing a name and serializer.
* @param <S> The generic type of the state * @param <S> The generic type of the state
Expand All @@ -43,8 +52,33 @@ public interface OperatorStateStore {
*/ */
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception; <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;


/**
* Creates (or restores) a list state. Each state is registered under a unique name.
* The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
*
* <p>Note the semantic differences between an operator list state and a keyed list state
* (see {@link KeyedStateStore#getListState(ListStateDescriptor)}). Under the context of operator state,
* the list is a collection of state items that are independent from each other and eligible for redistribution
* across operator instances in case of changed operator parallelism. In other words, these state items are
* the finest granularity at which non-keyed state can be redistributed, and should not be correlated with
* each other.
*
* <p>The redistribution scheme of this list state upon operator rescaling is a broadcast pattern, such that
* the logical whole state (a concatenation of all the lists of state elements previously managed by each operator
* before the restore) is restored to all parallel operators so that each of them will get the union of all state
* items before the restore.
*
* @param stateDescriptor The descriptor for this state, providing a name and serializer.
* @param <S> The generic type of the state
*
* @return A list for all state partitions.
* @throws Exception
*/
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

/** /**
* Returns a set with the names of all currently registered states. * Returns a set with the names of all currently registered states.
*
* @return set of names for all registered states. * @return set of names for all registered states.
*/ */
Set<String> getRegisteredStateNames(); Set<String> getRegisteredStateNames();
Expand Down
Expand Up @@ -95,12 +95,8 @@ public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) thr
return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
} }


@SuppressWarnings("unchecked") @Override
public <T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName) throws Exception { public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return (ListState<T>) getBroadcastOperatorState(new ListStateDescriptor<>(stateName, javaSerializer));
}

public <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return getListState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST); return getListState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST);
} }


Expand Down
Expand Up @@ -55,7 +55,7 @@ public class OperatorStateBackendTest {
public void testCreateOnAbstractStateBackend() throws Exception { public void testCreateOnAbstractStateBackend() throws Exception {
// we use the memory state backend as a subclass of the AbstractStateBackend // we use the memory state backend as a subclass of the AbstractStateBackend
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(); final AbstractStateBackend abstractStateBackend = new MemoryStateBackend();
OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend( final OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
createMockEnvironment(), "test-operator"); createMockEnvironment(), "test-operator");


assertNotNull(operatorStateBackend); assertNotNull(operatorStateBackend);
Expand All @@ -75,7 +75,7 @@ public void testRegisterStatesWithoutTypeSerializer() throws Exception {
final ExecutionConfig cfg = new ExecutionConfig(); final ExecutionConfig cfg = new ExecutionConfig();
cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class); cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class);


final DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg); final OperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(classLoader, cfg);


ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class); ListStateDescriptor<File> stateDescriptor = new ListStateDescriptor<>("test", File.class);
ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class); ListStateDescriptor<String> stateDescriptor2 = new ListStateDescriptor<>("test2", String.class);
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testRegisterStatesWithoutTypeSerializer() throws Exception {


@Test @Test
public void testRegisterStates() throws Exception { public void testRegisterStates() throws Exception {
final DefaultOperatorStateBackend operatorStateBackend = final OperatorStateBackend operatorStateBackend =
new DefaultOperatorStateBackend(classLoader, new ExecutionConfig()); new DefaultOperatorStateBackend(classLoader, new ExecutionConfig());


ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
Expand Down Expand Up @@ -140,7 +140,7 @@ public void testRegisterStates() throws Exception {
assertEquals(23, it.next()); assertEquals(23, it.next());
assertTrue(!it.hasNext()); assertTrue(!it.hasNext());


ListState<Serializable> listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
assertNotNull(listState3); assertNotNull(listState3);
assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size());
assertTrue(!it.hasNext()); assertTrue(!it.hasNext());
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testRegisterStates() throws Exception {
assertTrue(!it.hasNext()); assertTrue(!it.hasNext());


try { try {
operatorStateBackend.getBroadcastOperatorState(stateDescriptor2); operatorStateBackend.getUnionListState(stateDescriptor2);
fail("Did not detect changed mode"); fail("Did not detect changed mode");
} catch (IllegalStateException ignored) { } catch (IllegalStateException ignored) {


Expand All @@ -194,7 +194,7 @@ public void testRegisterStates() throws Exception {
public void testSnapshotEmpty() throws Exception { public void testSnapshotEmpty() throws Exception {
final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);


final DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend) final OperatorStateBackend operatorStateBackend =
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator"); abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");


CheckpointStreamFactory streamFactory = CheckpointStreamFactory streamFactory =
Expand All @@ -211,15 +211,15 @@ public void testSnapshotEmpty() throws Exception {
public void testSnapshotRestore() throws Exception { public void testSnapshotRestore() throws Exception {
AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);


DefaultOperatorStateBackend operatorStateBackend = (DefaultOperatorStateBackend) OperatorStateBackend operatorStateBackend =
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name"); abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name");


ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>());
ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>());
ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1); ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1);
ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2); ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2);
ListState<Serializable> listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);


listState1.add(42); listState1.add(42);
listState1.add(4711); listState1.add(4711);
Expand All @@ -242,8 +242,7 @@ public void testSnapshotRestore() throws Exception {
operatorStateBackend.close(); operatorStateBackend.close();
operatorStateBackend.dispose(); operatorStateBackend.dispose();


//TODO this is temporarily casted to test already functionality that we do not yet expose through public API operatorStateBackend = abstractStateBackend.createOperatorStateBackend(
operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend(
createMockEnvironment(), createMockEnvironment(),
"testOperator"); "testOperator");


Expand All @@ -253,7 +252,7 @@ public void testSnapshotRestore() throws Exception {


listState1 = operatorStateBackend.getListState(stateDescriptor1); listState1 = operatorStateBackend.getListState(stateDescriptor1);
listState2 = operatorStateBackend.getListState(stateDescriptor2); listState2 = operatorStateBackend.getListState(stateDescriptor2);
listState3 = operatorStateBackend.getBroadcastOperatorState(stateDescriptor3); listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);


assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size());


Expand Down
Expand Up @@ -145,7 +145,7 @@ public void testSavepointRescalingOutKeyedStateDerivedMaxParallelism() throws Ex
} }


/** /**
* Tests that a a job with purely keyed state can be restarted from a savepoint * Tests that a job with purely keyed state can be restarted from a savepoint
* with a different parallelism. * with a different parallelism.
*/ */
public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism) throws Exception { public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism) throws Exception {
Expand Down Expand Up @@ -993,10 +993,9 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception {
public void initializeState(FunctionInitializationContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception {


if (broadcast) { if (broadcast) {
//TODO this is temporarily casted to test already functionality that we do not yet expose through public API this.counterPartitions = context
DefaultOperatorStateBackend operatorStateStore = (DefaultOperatorStateBackend) context.getOperatorStateStore(); .getOperatorStateStore()
this.counterPartitions = .getUnionListState(new ListStateDescriptor<>("counter_partitions", IntSerializer.INSTANCE));
operatorStateStore.getBroadcastSerializableListState("counter_partitions");
} else { } else {
this.counterPartitions = context this.counterPartitions = context
.getOperatorStateStore() .getOperatorStateStore()
Expand Down

0 comments on commit 2ef4900

Please sign in to comment.