diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java index 87a775993facd..c1cdfe4a09eec 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java @@ -56,33 +56,6 @@ public interface OperatorStateStore { */ ListState getSerializableListState(String stateName) throws Exception; - /** - * Creates (or restores) a list state. Each state is registered under a unique name. - * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore). - * - * On restore, all items in the list are broadcasted to all parallel operator instances. - * - * @param stateDescriptor The descriptor for this state, providing a name and serializer. - * @param The generic type of the state - * - * @return A list for all state partitions. - * @throws Exception - */ - ListState getBroadcastOperatorState(ListStateDescriptor stateDescriptor) throws Exception; - - /** - * Creates a state of the given name that uses Java serialization to persist the state. On restore, all items - * in the list are broadcasted to all parallel operator instances. - * - *

This is a simple convenience method. For more flexibility on how state serialization - * should happen, use the {@link #getBroadcastOperatorState(ListStateDescriptor)} method. - * - * @param stateName The name of state to create - * @return A list state using Java serialization to serialize state objects. - * @throws Exception - */ - ListState getBroadcastSerializableListState(String stateName) throws Exception; - /** * Returns a set with the names of all currently registered states. * @return set of names for all registered states. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 6c650888b17f7..1cd1da782e706 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -91,12 +91,10 @@ public ListState getOperatorState(ListStateDescriptor stateDescriptor) } @SuppressWarnings("unchecked") - @Override public ListState getBroadcastSerializableListState(String stateName) throws Exception { return (ListState) getBroadcastOperatorState(new ListStateDescriptor<>(stateName, javaSerializer)); } - @Override public ListState getBroadcastOperatorState(ListStateDescriptor stateDescriptor) throws Exception { return getOperatorState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index cd0391faf7759..5bd085f03cb4a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -45,8 +45,9 @@ static Environment createMockEnvironment() { return env; } - private OperatorStateBackend createNewOperatorStateBackend() throws Exception { - return abstractStateBackend.createOperatorStateBackend( + private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception { + //TODO this is temporarily casted to test already functionality that we do not yet expose through public API + return (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend( createMockEnvironment(), "test-operator"); } @@ -60,7 +61,7 @@ public void testCreateNew() throws Exception { @Test public void testRegisterStates() throws Exception { - OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); ListStateDescriptor stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); @@ -143,7 +144,7 @@ public void testRegisterStates() throws Exception { @Test public void testSnapshotRestore() throws Exception { - OperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); ListStateDescriptor stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); ListStateDescriptor stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); ListStateDescriptor stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); @@ -171,7 +172,8 @@ public void testSnapshotRestore() throws Exception { operatorStateBackend.close(); operatorStateBackend.dispose(); - operatorStateBackend = abstractStateBackend.createOperatorStateBackend( + //TODO this is temporarily casted to test already functionality that we do not yet expose through public API + operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend( createMockEnvironment(), "testOperator"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 45fcc2533f942..bd1678ef41c10 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -969,8 +970,10 @@ public void snapshotState(FunctionSnapshotContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception { if (broadcast) { + //TODO this is temporarily casted to test already functionality that we do not yet expose through public API + DefaultOperatorStateBackend operatorStateStore = (DefaultOperatorStateBackend) context.getOperatorStateStore(); this.counterPartitions = - context.getOperatorStateStore().getBroadcastSerializableListState("counter_partitions"); + operatorStateStore.getBroadcastSerializableListState("counter_partitions"); } else { this.counterPartitions = context.getOperatorStateStore().getSerializableListState("counter_partitions");