Skip to content

Commit caf4672

Browse files
aljoschaStephanEwen
authored andcommitted
[FLINK-3201] Enhance Partitioned State Interface with State Types
Add new state types ValueState, ListState and ReducingState, where ListState and ReducingState derive from interface MergingState. ValueState behaves exactly the same as OperatorState. MergingState is a stateful list to which elements can be added and for which the elements that it contains can be obtained. If using a ListState the list of elements is actually kept, for a ReducingState a reduce function is used to combine all added elements into one. To create a ValueState the user passes a ValueStateIdentifier to StreamingRuntimeContext.getPartitionedState() while they would pass a ListStateIdentifier or ReducingStateIdentifier for the other state types. This change is necessary to give the system more information about the nature of the operator state. We want this to be able to do incremental snapshots. This would not be possible, for example, if the user had a List as a state. Inside OperatorState this list would be opaque and Flink could not create good incremental snapshots. This also refactors the StateBackend. Before, the logic for partitioned state was spread out over StreamingRuntimeContext, AbstractStreamOperator and StateBackend. Now it is consolidated in StateBackend. This also adds support for partitioned state in two-input operators.
1 parent 680c2c3 commit caf4672

110 files changed

Lines changed: 4566 additions & 1819 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,26 @@
1717

1818
package org.apache.flink.contrib.streaming.state;
1919

20-
import java.io.IOException;
2120
import java.io.Serializable;
2221
import java.sql.Connection;
2322
import java.sql.PreparedStatement;
2423
import java.sql.SQLException;
24+
import java.util.ArrayList;
2525
import java.util.Random;
2626
import java.util.concurrent.Callable;
2727

28+
import org.apache.flink.api.common.state.ListState;
29+
import org.apache.flink.api.common.state.ListStateDescriptor;
30+
import org.apache.flink.api.common.state.ReducingState;
31+
import org.apache.flink.api.common.state.ReducingStateDescriptor;
32+
import org.apache.flink.api.common.state.ValueState;
33+
import org.apache.flink.api.common.state.ValueStateDescriptor;
2834
import org.apache.flink.api.common.typeutils.TypeSerializer;
2935
import org.apache.flink.runtime.execution.Environment;
30-
import org.apache.flink.runtime.state.StateBackend;
36+
import org.apache.flink.runtime.state.AbstractStateBackend;
37+
import org.apache.flink.runtime.state.ArrayListSerializer;
38+
import org.apache.flink.runtime.state.GenericListState;
39+
import org.apache.flink.runtime.state.GenericReducingState;
3140
import org.apache.flink.runtime.state.StateHandle;
3241
import org.apache.flink.util.InstantiationUtil;
3342
import org.slf4j.Logger;
@@ -36,9 +45,9 @@
3645
import static org.apache.flink.contrib.streaming.state.SQLRetrier.retry;
3746

3847
/**
39-
* {@link StateBackend} for storing checkpoints in JDBC supporting databases.
48+
* {@link AbstractStateBackend} for storing checkpoints in JDBC supporting databases.
4049
* Key-Value state is stored out-of-core and is lazily fetched using the
41-
* {@link LazyDbKvState} implementation. A different backend can also be
50+
* {@link LazyDbValueState} implementation. A different backend can also be
4251
* provided in the constructor to store the non-partitioned states. A common use
4352
* case would be to store the key-value states in the database and store larger
4453
* non-partitioned states on a distributed file system.
@@ -56,7 +65,7 @@
5665
* {@link MySqlAdapter} can be supplied in the {@link DbBackendConfig}.
5766
*
5867
*/
59-
public class DbStateBackend extends StateBackend<DbStateBackend> {
68+
public class DbStateBackend extends AbstractStateBackend {
6069

6170
private static final long serialVersionUID = 1L;
6271
private static final Logger LOG = LoggerFactory.getLogger(DbStateBackend.class);
@@ -79,10 +88,12 @@ public class DbStateBackend extends StateBackend<DbStateBackend> {
7988

8089
private transient PreparedStatement insertStatement;
8190

91+
private String operatorIdentifier;
92+
8293
// ------------------------------------------------------
8394

8495
// We allow to use a different backend for storing non-partitioned states
85-
private StateBackend<?> nonPartitionedStateBackend = null;
96+
private AbstractStateBackend nonPartitionedStateBackend = null;
8697

8798
// ------------------------------------------------------
8899

@@ -104,7 +115,7 @@ public DbStateBackend(DbBackendConfig backendConfig) {
104115
* non-partitioned state snapshots.
105116
*
106117
*/
107-
public DbStateBackend(DbBackendConfig backendConfig, StateBackend<?> backend) {
118+
public DbStateBackend(DbBackendConfig backendConfig, AbstractStateBackend backend) {
108119
this(backendConfig);
109120
this.nonPartitionedStateBackend = backend;
110121
}
@@ -160,7 +171,7 @@ public DbStateHandle<S> call() throws Exception {
160171

161172
insertStatement.executeUpdate();
162173

163-
return new DbStateHandle<S>(appIdShort, checkpointID, timestamp, handleId,
174+
return new DbStateHandle<>(appIdShort, checkpointID, timestamp, handleId,
164175
dbConfig, serializedState.length);
165176
}
166177
}, numSqlRetries, sqlRetrySleep);
@@ -182,20 +193,46 @@ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkp
182193
}
183194

184195
@Override
185-
public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName,
186-
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
187-
return new LazyDbKvState<K, V>(
188-
stateId + "_" + env.getApplicationID().toShortString(),
189-
env.getTaskInfo().getIndexOfThisSubtask() == 0,
190-
getConnections(),
191-
getConfiguration(),
192-
keySerializer,
193-
valueSerializer,
194-
defaultValue);
196+
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
197+
ValueStateDescriptor<T> stateDesc) throws Exception {
198+
String stateName = operatorIdentifier + "_"+ stateDesc.getName();
199+
200+
return new LazyDbValueState<>(
201+
stateName,
202+
env.getTaskInfo().getIndexOfThisSubtask() == 0,
203+
getConnections(),
204+
getConfiguration(),
205+
keySerializer,
206+
namespaceSerializer,
207+
stateDesc);
208+
}
209+
210+
@Override
211+
protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
212+
ListStateDescriptor<T> stateDesc) throws Exception {
213+
ValueStateDescriptor<ArrayList<T>> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, new ArrayListSerializer<>(stateDesc.getSerializer()));
214+
ValueState<ArrayList<T>> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
215+
return new GenericListState<>(valueState);
216+
}
217+
218+
@Override
219+
@SuppressWarnings("unchecked")
220+
protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
221+
ReducingStateDescriptor<T> stateDesc) throws Exception {
222+
223+
ValueStateDescriptor<T> valueStateDescriptor = new ValueStateDescriptor<>(stateDesc.getName(), null, stateDesc.getSerializer());
224+
ValueState<T> valueState = createValueState(namespaceSerializer, valueStateDescriptor);
225+
return new GenericReducingState<>(valueState, stateDesc.getReduceFunction());
195226
}
196227

197228
@Override
198-
public void initializeForJob(final Environment env) throws Exception {
229+
public void initializeForJob(final Environment env,
230+
String operatorIdentifier,
231+
TypeSerializer<?> keySerializer) throws Exception {
232+
super.initializeForJob(env, operatorIdentifier, keySerializer);
233+
234+
this.operatorIdentifier = operatorIdentifier;
235+
199236
this.rnd = new Random();
200237
this.env = env;
201238

@@ -221,7 +258,7 @@ public PreparedStatement call() throws SQLException {
221258
}
222259
}, numSqlRetries, sqlRetrySleep);
223260
} else {
224-
nonPartitionedStateBackend.initializeForJob(env);
261+
nonPartitionedStateBackend.initializeForJob(env, operatorIdentifier, keySerializer);
225262
}
226263

227264
if (LOG.isDebugEnabled()) {

0 commit comments

Comments
 (0)