Skip to content
Permalink
Browse files
[FLINK-27560][python] Refactor SimpleStateRequestHandler to support b…
…oth keyed state and operator state

This closes #19687.
  • Loading branch information
Vancior authored and dianfu committed May 12, 2022
1 parent 7ec6556 commit 63f0871d0f3378e3c7a2716db4fa7b41f319e2d9
Showing 16 changed files with 1,210 additions and 700 deletions.
@@ -87,6 +87,7 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
config.get(MAP_STATE_WRITE_CACHE_SIZE)),
getFlinkMetricContainer(),
null,
getOperatorStateBackend(),
null,
null,
null,
@@ -126,6 +126,7 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
config.get(MAP_STATE_WRITE_CACHE_SIZE)),
getFlinkMetricContainer(),
getKeyedStateBackend(),
getOperatorStateBackend(),
keyTypeSerializer,
null,
new TimerRegistration(
@@ -156,6 +156,7 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
config.get(MAP_STATE_WRITE_CACHE_SIZE)),
getFlinkMetricContainer(),
getKeyedStateBackend(),
getOperatorStateBackend(),
keyTypeSerializer,
namespaceSerializer,
new TimerRegistration(
@@ -84,6 +84,7 @@ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
config.get(MAP_STATE_WRITE_CACHE_SIZE)),
getFlinkMetricContainer(),
null,
getOperatorStateBackend(),
null,
null,
null,
@@ -25,6 +25,7 @@
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.util.Preconditions;
@@ -73,7 +74,8 @@ public BeamDataStreamPythonFunctionRunner(
String headOperatorFunctionUrn,
List<FlinkFnApi.UserDefinedDataStreamFunction> userDefinedDataStreamFunctions,
@Nullable FlinkMetricContainer flinkMetricContainer,
@Nullable KeyedStateBackend<?> stateBackend,
@Nullable KeyedStateBackend<?> keyedStateBackend,
@Nullable OperatorStateBackend operatorStateBackend,
@Nullable TypeSerializer<?> keySerializer,
@Nullable TypeSerializer<?> namespaceSerializer,
@Nullable TimerRegistration timerRegistration,
@@ -87,7 +89,8 @@ public BeamDataStreamPythonFunctionRunner(
taskName,
environmentManager,
flinkMetricContainer,
stateBackend,
keyedStateBackend,
operatorStateBackend,
keySerializer,
namespaceSerializer,
timerRegistration,
@@ -34,7 +34,9 @@
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.LongFunctionWithException;

@@ -120,6 +122,8 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {

@Nullable private final KeyedStateBackend<?> keyedStateBackend;

@Nullable private final OperatorStateBackend operatorStateBackend;

@Nullable private final TypeSerializer<?> keySerializer;

@Nullable private final TypeSerializer<?> namespaceSerializer;
@@ -187,6 +191,7 @@ public BeamPythonFunctionRunner(
ProcessPythonEnvironmentManager environmentManager,
@Nullable FlinkMetricContainer flinkMetricContainer,
@Nullable KeyedStateBackend<?> keyedStateBackend,
@Nullable OperatorStateBackend operatorStateBackend,
@Nullable TypeSerializer<?> keySerializer,
@Nullable TypeSerializer<?> namespaceSerializer,
@Nullable TimerRegistration timerRegistration,
@@ -199,6 +204,7 @@ public BeamPythonFunctionRunner(
this.environmentManager = Preconditions.checkNotNull(environmentManager);
this.flinkMetricContainer = flinkMetricContainer;
this.keyedStateBackend = keyedStateBackend;
this.operatorStateBackend = operatorStateBackend;
this.keySerializer = keySerializer;
this.namespaceSerializer = namespaceSerializer;
this.timerRegistration = timerRegistration;
@@ -219,7 +225,11 @@ public void open(ReadableConfig config) throws Exception {

stateRequestHandler =
getStateRequestHandler(
keyedStateBackend, keySerializer, namespaceSerializer, config);
keyedStateBackend,
operatorStateBackend,
keySerializer,
namespaceSerializer,
config);

// The creation of stageBundleFactory depends on the initialized environment manager.
environmentManager.open();
@@ -641,16 +651,16 @@ private TimerReceiverFactory createTimerReceiverFactory() {
}

private static StateRequestHandler getStateRequestHandler(
KeyedStateBackend<?> keyedStateBackend,
TypeSerializer<?> keySerializer,
TypeSerializer<?> namespaceSerializer,
@Nullable KeyedStateBackend<?> keyedStateBackend,
@Nullable OperatorStateBackend operatorStateBackend,
@Nullable TypeSerializer<?> keySerializer,
@Nullable TypeSerializer<?> namespaceSerializer,
ReadableConfig config) {
if (keyedStateBackend == null) {
return StateRequestHandler.unsupported();
} else {
assert keySerializer != null;
return new SimpleStateRequestHandler(
keyedStateBackend, keySerializer, namespaceSerializer, config);
}
return BeamStateRequestHandler.of(
keyedStateBackend,
operatorStateBackend,
keySerializer,
namespaceSerializer,
config);
}
}

0 comments on commit 63f0871

Please sign in to comment.