From f75c7bde35eb9aae1da4c6ad87030cbb8af3b87d Mon Sep 17 00:00:00 2001 From: "xiaogang.sxg" Date: Thu, 23 Mar 2017 23:32:15 +0800 Subject: [PATCH 1/2] Add KeyedStateHandle for the snapshots in keyed streams --- .../state/RocksDBKeyedStateBackend.java | 47 +++++++++---- .../state/RocksDBAsyncSnapshotTest.java | 3 +- .../state/RocksDBStateBackendTest.java | 21 +++--- .../cep/operator/CEPMigration12to13Test.java | 14 ++-- .../apache/flink/migration/MigrationUtil.java | 10 +-- .../checkpoint/StateAssignmentOperation.java | 41 ++++++------ .../runtime/checkpoint/SubtaskState.java | 14 ++-- .../savepoint/SavepointV1Serializer.java | 42 +++++++----- .../state/AbstractKeyedStateBackend.java | 2 +- .../runtime/state/KeyGroupsStateHandle.java | 39 ++++------- .../flink/runtime/state/KeyedStateHandle.java | 40 +++++++++++ .../state/StateInitializationContextImpl.java | 28 +++++++- .../flink/runtime/state/TaskStateHandles.java | 16 ++--- .../state/heap/HeapKeyedStateBackend.java | 48 ++++++++++---- .../checkpoint/CheckpointCoordinatorTest.java | 30 +++++---- .../CheckpointStateRestoreTest.java | 3 +- .../savepoint/MigrationV0ToV1Test.java | 14 +++- .../KeyedStateCheckpointOutputStreamTest.java | 4 +- .../runtime/state/StateBackendTestBase.java | 66 +++++++++---------- ...eyedStateBackendSnapshotMigrationTest.java | 3 +- .../api/operators/AbstractStreamOperator.java | 17 +++-- .../api/operators/OperatorSnapshotResult.java | 9 +-- .../runtime/tasks/OperatorStateHandles.java | 14 ++-- .../streaming/runtime/tasks/StreamTask.java | 7 +- .../operators/OperatorSnapshotResultTest.java | 5 +- .../StateInitializationContextImplTest.java | 9 +-- .../tasks/InterruptSensitiveRestoreTest.java | 17 ++--- .../runtime/tasks/StreamTaskTest.java | 5 +- .../AbstractStreamOperatorTestHarness.java | 25 +++---- ...eyedOneInputStreamOperatorTestHarness.java | 17 ++--- ...eyedTwoInputStreamOperatorTestHarness.java | 3 +- 31 files changed, 371 insertions(+), 242 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 2ce527f2643c8..89e4153590bab 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -40,6 +40,7 @@ import org.apache.flink.migration.MigrationNamespaceSerializerProxy; import org.apache.flink.migration.MigrationUtil; import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; @@ -52,6 +53,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; @@ -257,7 +259,7 @@ private boolean hasRegisteredState() { * @throws Exception */ @Override - public RunnableFuture snapshot( + public RunnableFuture snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, @@ -286,8 +288,8 @@ public RunnableFuture snapshot( } // implementation of the async IO operation, based on FutureTask - AbstractAsyncIOCallable ioCallable = - new AbstractAsyncIOCallable() { + AbstractAsyncIOCallable ioCallable = + new AbstractAsyncIOCallable() { @Override public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { @@ -620,7 +622,7 @@ private static void checkInterrupted() throws InterruptedException { } @Override - public void restore(Collection restoreState) throws Exception { + public void restore(Collection restoreState) throws Exception { LOG.info("Initializing RocksDB keyed state backend from snapshot."); if (LOG.isDebugEnabled()) { @@ -669,17 +671,23 @@ public RocksDBRestoreOperation(RocksDBKeyedStateBackend rocksDBKeyedStateBack /** * Restores all key-groups data that is referenced by the passed state handles. * - * @param keyGroupsStateHandles List of all key groups state handles that shall be restored. + * @param keyedStateHandles List of all key groups state handles that shall be restored. * @throws IOException * @throws ClassNotFoundException * @throws RocksDBException */ - public void doRestore(Collection keyGroupsStateHandles) + public void doRestore(Collection keyedStateHandles) throws IOException, ClassNotFoundException, RocksDBException { - for (KeyGroupsStateHandle keyGroupsStateHandle : keyGroupsStateHandles) { - if (keyGroupsStateHandle != null) { - this.currentKeyGroupsStateHandle = keyGroupsStateHandle; + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (keyedStateHandle != null) { + + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; restoreKeyGroupsInStateHandle(); } } @@ -761,6 +769,13 @@ private void restoreKVStateMetaData() throws IOException, ClassNotFoundException private void restoreKVStateData() throws IOException, RocksDBException { //for all key-groups in the current state handle... for (Tuple2 keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; + + // Skip those key groups that do not belong to the backend + if (!rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup)) { + continue; + } + long offset = keyGroupOffset.f1; //not empty key-group? if (0L != offset) { @@ -1143,15 +1158,25 @@ public File getInstanceBasePath() { * For backwards compatibility, remove again later! */ @Deprecated - private void restoreOldSavepointKeyedState(Collection restoreState) throws Exception { + private void restoreOldSavepointKeyedState(Collection restoreState) throws Exception { if (restoreState.isEmpty()) { return; } Preconditions.checkState(1 == restoreState.size(), "Only one element expected here."); + + KeyedStateHandle keyedStateHandle = restoreState.iterator().next(); + if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + MigrationKeyGroupStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + + MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle; + HashMap namedStates; - try (FSDataInputStream inputStream = restoreState.iterator().next().openInputStream()) { + try (FSDataInputStream inputStream = keyGroupStateHandle.openInputStream()) { namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader); } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 90de7a687e6b9..ffe2ce209164a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; @@ -343,7 +344,7 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception { StringSerializer.INSTANCE, new ValueStateDescriptor<>("foobar", String.class)); - RunnableFuture snapshotFuture = keyedStateBackend.snapshot( + RunnableFuture snapshotFuture = keyedStateBackend.snapshot( checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint()); try { diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 708613bfea053..d95a9b41d9857 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -172,7 +173,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { @Test public void testRunningSnapshotAfterBackendClosed() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); RocksDB spyDB = keyedStateBackend.db; @@ -210,7 +211,7 @@ public void testRunningSnapshotAfterBackendClosed() throws Exception { @Test public void testReleasingSnapshotAfterBackendClosed() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); RocksDB spyDB = keyedStateBackend.db; @@ -239,7 +240,7 @@ public void testReleasingSnapshotAfterBackendClosed() throws Exception { @Test public void testDismissingSnapshot() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); snapshot.cancel(true); verifyRocksObjectsReleased(); } @@ -247,7 +248,7 @@ public void testDismissingSnapshot() throws Exception { @Test public void testDismissingSnapshotNotRunnable() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); snapshot.cancel(true); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); @@ -264,7 +265,7 @@ public void testDismissingSnapshotNotRunnable() throws Exception { @Test public void testCompletingSnapshot() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); waiter.await(); // wait for snapshot to run @@ -272,10 +273,10 @@ public void testCompletingSnapshot() throws Exception { runStateUpdates(); blocker.trigger(); // allow checkpointing to start writing waiter.await(); // wait for snapshot stream writing to run - KeyGroupsStateHandle keyGroupsStateHandle = snapshot.get(); - assertNotNull(keyGroupsStateHandle); - assertTrue(keyGroupsStateHandle.getStateSize() > 0); - assertEquals(2, keyGroupsStateHandle.getNumberOfKeyGroups()); + KeyedStateHandle keyedStateHandle = snapshot.get(); + assertNotNull(keyedStateHandle); + assertTrue(keyedStateHandle.getStateSize() > 0); + assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); asyncSnapshotThread.join(); verifyRocksObjectsReleased(); @@ -284,7 +285,7 @@ public void testCompletingSnapshot() throws Exception { @Test public void testCancelRunningSnapshot() throws Exception { setupRocksKeyedStateBackend(); - RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); waiter.await(); // wait for snapshot to run diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java index 65fa7335ab15d..39b6f30af343d 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java @@ -26,7 +26,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.watermark.Watermark; @@ -128,8 +128,8 @@ public Integer getKey(Event value) throws Exception { final OperatorStateHandles snapshot = new OperatorStateHandles( (int) ois.readObject(), (StreamStateHandle) ois.readObject(), - (Collection) ois.readObject(), - (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), (Collection) ois.readObject(), (Collection) ois.readObject() ); @@ -243,8 +243,8 @@ public Integer getKey(Event value) throws Exception { final OperatorStateHandles snapshot = new OperatorStateHandles( (int) ois.readObject(), (StreamStateHandle) ois.readObject(), - (Collection) ois.readObject(), - (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), (Collection) ois.readObject(), (Collection) ois.readObject() ); @@ -363,8 +363,8 @@ public Integer getKey(Event value) throws Exception { final OperatorStateHandles snapshot = new OperatorStateHandles( (int) ois.readObject(), (StreamStateHandle) ois.readObject(), - (Collection) ois.readObject(), - (Collection) ois.readObject(), + (Collection) ois.readObject(), + (Collection) ois.readObject(), (Collection) ois.readObject(), (Collection) ois.readObject() ); diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java index 9427f72015aec..a4e3a2e43a1a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationUtil.java @@ -19,17 +19,17 @@ package org.apache.flink.migration; import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import java.util.Collection; public class MigrationUtil { @SuppressWarnings("deprecation") - public static boolean isOldSavepointKeyedState(Collection keyGroupsStateHandles) { - return (keyGroupsStateHandles != null) - && (keyGroupsStateHandles.size() == 1) - && (keyGroupsStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle); + public static boolean isOldSavepointKeyedState(Collection keyedStateHandles) { + return (keyedStateHandles != null) + && (keyedStateHandles.size() == 1) + && (keyedStateHandles.iterator().next() instanceof MigrationKeyGroupStateHandle); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 3fda430852648..310983e280f9a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -160,8 +161,8 @@ private static void assignTaskStatesToOperatorInstances( @SuppressWarnings("unchecked") List[] parallelOpStatesStream = new List[chainLength]; - List parallelKeyedStatesBackend = new ArrayList<>(oldParallelism); - List parallelKeyedStateStream = new ArrayList<>(oldParallelism); + List parallelKeyedStatesBackend = new ArrayList<>(oldParallelism); + List parallelKeyedStateStream = new ArrayList<>(oldParallelism); for (int p = 0; p < oldParallelism; ++p) { SubtaskState subtaskState = taskState.getState(p); @@ -173,12 +174,12 @@ private static void assignTaskStatesToOperatorInstances( collectParallelStatesByChainOperator( parallelOpStatesStream, subtaskState.getRawOperatorState()); - KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); + KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); if (null != keyedStateBackend) { parallelKeyedStatesBackend.add(keyedStateBackend); } - KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState(); + KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState(); if (null != keyedStateStream) { parallelKeyedStateStream.add(keyedStateStream); } @@ -252,13 +253,13 @@ private static void assignTaskStatesToOperatorInstances( .getTaskVertices()[subTaskIdx] .getCurrentExecutionAttempt(); - List newKeyedStatesBackend; - List newKeyedStateStream; + List newKeyedStatesBackend; + List newKeyedStateStream; if (oldParallelism == newParallelism) { SubtaskState subtaskState = taskState.getState(subTaskIdx); if (subtaskState != null) { - KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState(); - KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState(); + KeyedStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState(); + KeyedStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState(); newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList( oldKeyedStatesBackend) : null; newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList( @@ -269,8 +270,8 @@ private static void assignTaskStatesToOperatorInstances( } } else { KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx); - newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds); - newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds); + newKeyedStatesBackend = getKeyedStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds); + newKeyedStateStream = getKeyedStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds); } TaskStateHandles taskStateHandles = new TaskStateHandles( @@ -290,19 +291,19 @@ private static void assignTaskStatesToOperatorInstances( *

*

This is publicly visible to be used in tests. */ - public static List getKeyGroupsStateHandles( - Collection allKeyGroupsHandles, - KeyGroupRange subtaskKeyGroupIds) { + public static List getKeyedStateHandles( + Collection keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { - List subtaskKeyGroupStates = new ArrayList<>(); + List subtaskKeyedStateHandles = new ArrayList<>(); - for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) { - KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds); - if (intersection.getNumberOfKeyGroups() > 0) { - subtaskKeyGroupStates.add(intersection); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(subtaskKeyGroupRange); + + subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); } - return subtaskKeyGroupStates; + + return subtaskKeyedStateHandles; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index 1393e32b1ed9c..9e195b116229c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; @@ -56,12 +56,12 @@ public class SubtaskState implements StateObject { /** * Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */ - private final KeyGroupsStateHandle managedKeyedState; + private final KeyedStateHandle managedKeyedState; /** * Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */ - private final KeyGroupsStateHandle rawKeyedState; + private final KeyedStateHandle rawKeyedState; /** * The state size. This is also part of the deserialized state handle. @@ -74,8 +74,8 @@ public SubtaskState( ChainedStateHandle legacyOperatorState, ChainedStateHandle managedOperatorState, ChainedStateHandle rawOperatorState, - KeyGroupsStateHandle managedKeyedState, - KeyGroupsStateHandle rawKeyedState) { + KeyedStateHandle managedKeyedState, + KeyedStateHandle rawKeyedState) { this.legacyOperatorState = checkNotNull(legacyOperatorState, "State"); this.managedOperatorState = managedOperatorState; @@ -114,11 +114,11 @@ public ChainedStateHandle getRawOperatorState() { return rawOperatorState; } - public KeyGroupsStateHandle getManagedKeyedState() { + public KeyedStateHandle getManagedKeyedState() { return managedKeyedState; } - public KeyGroupsStateHandle getRawKeyedState() { + public KeyedStateHandle getRawKeyedState() { return rawKeyedState; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index ba1949a54e826..44461d8b8e5a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -155,11 +156,11 @@ private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputS serializeOperatorStateHandle(stateHandle, dos); } - KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); - serializeKeyGroupStateHandle(keyedStateBackend, dos); + KeyedStateHandle keyedStateBackend = subtaskState.getManagedKeyedState(); + serializeKeyedStateHandle(keyedStateBackend, dos); - KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState(); - serializeKeyGroupStateHandle(keyedStateStream, dos); + KeyedStateHandle keyedStateStream = subtaskState.getRawKeyedState(); + serializeKeyedStateHandle(keyedStateStream, dos); } private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException { @@ -188,9 +189,9 @@ private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws operatorStateStream.add(streamStateHandle); } - KeyGroupsStateHandle keyedStateBackend = deserializeKeyGroupStateHandle(dis); + KeyedStateHandle keyedStateBackend = deserializeKeyedStateHandle(dis); - KeyGroupsStateHandle keyedStateStream = deserializeKeyGroupStateHandle(dis); + KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis); ChainedStateHandle nonPartitionableStateChain = new ChainedStateHandle<>(nonPartitionableState); @@ -209,23 +210,27 @@ private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws keyedStateStream); } - private static void serializeKeyGroupStateHandle( - KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException { + private static void serializeKeyedStateHandle( + KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException { + + if (stateHandle == null) { + dos.writeByte(NULL_HANDLE); + } else if (stateHandle instanceof KeyGroupsStateHandle) { + KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) stateHandle; - if (stateHandle != null) { dos.writeByte(KEY_GROUPS_HANDLE); - dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup()); - dos.writeInt(stateHandle.getNumberOfKeyGroups()); - for (int keyGroup : stateHandle.keyGroups()) { - dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup)); + dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup()); + dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); + for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) { + dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup)); } - serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos); + serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos); } else { - dos.writeByte(NULL_HANDLE); + throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass()); } } - private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException { + private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { final int type = dis.readByte(); if (NULL_HANDLE == type) { return null; @@ -237,11 +242,12 @@ private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStre for (int i = 0; i < numKeyGroups; ++i) { offsets[i] = dis.readLong(); } - KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets); + KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets( + keyGroupRange, offsets); StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); } else { - throw new IllegalStateException("Reading invalid KeyGroupsStateHandle, type: " + type); + throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index e6e7b239f7b95..e86f1f8ff67f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -61,7 +61,7 @@ * @param Type of the key by which state is keyed. */ public abstract class AbstractKeyedStateBackend - implements KeyedStateBackend, Snapshotable, Closeable { + implements KeyedStateBackend, Snapshotable, Closeable { /** {@link TypeSerializer} for our key. */ protected final TypeSerializer keySerializer; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index b454e42c757f8..bad7fd424df5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -29,7 +29,7 @@ * consists of a range of key group snapshots. A key group is subset of the available * key space. The key groups are identified by their key group indices. */ -public class KeyGroupsStateHandle implements StreamStateHandle { +public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle { private static final long serialVersionUID = -8070326169926626355L; @@ -54,20 +54,18 @@ public KeyGroupsStateHandle(KeyGroupRangeOffsets groupRangeOffsets, StreamStateH /** * - * @return iterable over the key-group range for the key-group state referenced by this handle + * @return the internal key-group range to offsets metadata */ - public Iterable keyGroups() { - return groupRangeOffsets.getKeyGroupRange(); + public KeyGroupRangeOffsets getGroupRangeOffsets() { + return groupRangeOffsets; } - /** * - * @param keyGroupId the id of a key-group - * @return true if the provided key-group id is contained in the key-group range of this handle + * @return The handle to the actual states */ - public boolean containsKeyGroup(int keyGroupId) { - return groupRangeOffsets.getKeyGroupRange().contains(keyGroupId); + public StreamStateHandle getDelegateStateHandle() { + return stateHandle; } /** @@ -85,24 +83,13 @@ public long getOffsetForKeyGroup(int keyGroupId) { * @return key-group state over a range that is the intersection between this handle's key-group range and the * provided key-group range. */ - public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) { + public KeyGroupsStateHandle getIntersection(KeyGroupRange keyGroupRange) { return new KeyGroupsStateHandle(groupRangeOffsets.getIntersection(keyGroupRange), stateHandle); } - /** - * - * @return the internal key-group range to offsets metadata - */ - public KeyGroupRangeOffsets getGroupRangeOffsets() { - return groupRangeOffsets; - } - - /** - * - * @return number of key-groups in the key-group range of this handle - */ - public int getNumberOfKeyGroups() { - return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups(); + @Override + public KeyGroupRange getKeyGroupRange() { + return groupRangeOffsets.getKeyGroupRange(); } @Override @@ -120,10 +107,6 @@ public FSDataInputStream openInputStream() throws IOException { return stateHandle.openInputStream(); } - public StreamStateHandle getDelegateStateHandle() { - return stateHandle; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java new file mode 100644 index 0000000000000..b4fa91f139c55 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +/** + * Base for the handles of the checkpointed states in keyed streams. When + * recovering from failures, the handle will be passed to all tasks whose key + * group ranges overlap with it. + */ +public interface KeyedStateHandle extends StateObject { + + /** + * Returns the range of the key groups contained in the state. + */ + KeyGroupRange getKeyGroupRange(); + + /** + * Returns a keyed state handle which contains the states for the given + * key groups. + * + * @param keyGroupRange The key group range to intersect with + */ + KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java index 886d214be1346..d82af7217a7cd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java @@ -27,9 +27,11 @@ import org.apache.flink.util.Preconditions; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; /** @@ -55,7 +57,7 @@ public StateInitializationContextImpl( boolean restored, OperatorStateStore operatorStateStore, KeyedStateStore keyedStateStore, - Collection keyGroupsStateHandles, + Collection keyedStateHandles, Collection operatorStateHandles, CloseableRegistry closableRegistry) { @@ -64,7 +66,7 @@ public StateInitializationContextImpl( this.operatorStateStore = operatorStateStore; this.keyedStateStore = keyedStateStore; this.operatorStateHandles = operatorStateHandles; - this.keyGroupsStateHandles = keyGroupsStateHandles; + this.keyGroupsStateHandles = transform(keyedStateHandles); this.keyedStateIterable = keyGroupsStateHandles == null ? null @@ -136,6 +138,26 @@ public void close() { IOUtils.closeQuietly(closableRegistry); } + private static Collection transform(Collection keyedStateHandles) { + if (keyedStateHandles == null) { + return null; + } + + List keyGroupsStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + if (! (keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass() + "."); + } + + keyGroupsStateHandles.add((KeyGroupsStateHandle) keyedStateHandle); + } + + return keyGroupsStateHandles; + } + private static class KeyGroupStreamIterator extends AbstractStateStreamIterator { @@ -159,7 +181,7 @@ public boolean hasNext() { while (stateHandleIterator.hasNext()) { currentStateHandle = stateHandleIterator.next(); - if (currentStateHandle.getNumberOfKeyGroups() > 0) { + if (currentStateHandle.getKeyGroupRange().getNumberOfKeyGroups() > 0) { currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator(); return true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java index 417a9dd1fcb78..450413a695bbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java @@ -40,10 +40,10 @@ public class TaskStateHandles implements Serializable { private final ChainedStateHandle legacyOperatorState; /** Collection of handles which represent the managed keyed state of the head operator */ - private final Collection managedKeyedState; + private final Collection managedKeyedState; /** Collection of handles which represent the raw/streamed keyed state of the head operator */ - private final Collection rawKeyedState; + private final Collection rawKeyedState; /** Outer list represents the operator chain, each collection holds handles for managed state of a single operator */ private final List> managedOperatorState; @@ -67,8 +67,8 @@ public TaskStateHandles( ChainedStateHandle legacyOperatorState, List> managedOperatorState, List> rawOperatorState, - Collection managedKeyedState, - Collection rawKeyedState) { + Collection managedKeyedState, + Collection rawKeyedState) { this.legacyOperatorState = legacyOperatorState; this.managedKeyedState = managedKeyedState; @@ -82,11 +82,11 @@ public ChainedStateHandle getLegacyOperatorState() { return legacyOperatorState; } - public Collection getManagedKeyedState() { + public Collection getManagedKeyedState() { return managedKeyedState; } - public Collection getRawKeyedState() { + public Collection getRawKeyedState() { return rawKeyedState; } @@ -110,8 +110,8 @@ private static List> transform(ChainedStateHandl return out; } - private static List transform(KeyGroupsStateHandle in) { - return in == null ? Collections.emptyList() : Collections.singletonList(in); + private static List transform(T in) { + return in == null ? Collections.emptyList() : Collections.singletonList(in); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 46ec5c2330936..1c06c2c2bc3c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -39,6 +39,7 @@ import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot; import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; +import org.apache.flink.migration.state.MigrationKeyGroupStateHandle; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -50,6 +51,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; @@ -223,7 +225,7 @@ public InternalMapState createMapState(TypeSerializer @Override @SuppressWarnings("unchecked") - public RunnableFuture snapshot( + public RunnableFuture snapshot( final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, @@ -267,8 +269,8 @@ public RunnableFuture snapshot( //--------------------------------------------------- this becomes the end of sync part // implementation of the async IO operation, based on FutureTask - final AbstractAsyncIOCallable ioCallable = - new AbstractAsyncIOCallable() { + final AbstractAsyncIOCallable ioCallable = + new AbstractAsyncIOCallable() { AtomicBoolean open = new AtomicBoolean(false); @@ -340,7 +342,7 @@ public void done(boolean canceled) { } }; - AsyncStoppableTaskWithCallback task = AsyncStoppableTaskWithCallback.from(ioCallable); + AsyncStoppableTaskWithCallback task = AsyncStoppableTaskWithCallback.from(ioCallable); if (!asynchronousSnapshots) { task.run(); @@ -354,7 +356,7 @@ public void done(boolean canceled) { @SuppressWarnings("deprecation") @Override - public void restore(Collection restoredState) throws Exception { + public void restore(Collection restoredState) throws Exception { LOG.info("Initializing heap keyed state backend from snapshot."); if (LOG.isDebugEnabled()) { @@ -369,19 +371,26 @@ public void restore(Collection restoredState) throws Excep } @SuppressWarnings({"unchecked"}) - private void restorePartitionedState(Collection state) throws Exception { + private void restorePartitionedState(Collection state) throws Exception { final Map kvStatesById = new HashMap<>(); int numRegisteredKvStates = 0; stateTables.clear(); - for (KeyGroupsStateHandle keyGroupsHandle : state) { + for (KeyedStateHandle keyedStateHandle : state) { - if (keyGroupsHandle == null) { + if (keyedStateHandle == null) { continue; } - FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream(); + if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + KeyGroupsStateHandle.class + + ", but found: " + keyedStateHandle.getClass()); + } + + KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream(); cancelStreamRegistry.registerClosable(fsDataInputStream); try { @@ -412,9 +421,15 @@ private void restorePartitionedState(Collection state) thr } } - for (Tuple2 groupOffset : keyGroupsHandle.getGroupRangeOffsets()) { + for (Tuple2 groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; + + // Skip those key groups that don't belong to the backend. + if (!keyGroupRange.contains(keyGroupIndex)) { + continue; + } + fsDataInputStream.seek(offset); int writtenKeyGroupIndex = inView.readInt(); @@ -449,7 +464,7 @@ public String toString() { @SuppressWarnings({"unchecked", "rawtypes", "DeprecatedIsStillUsed"}) @Deprecated private void restoreOldSavepointKeyedState( - Collection stateHandles) throws IOException, ClassNotFoundException { + Collection stateHandles) throws IOException, ClassNotFoundException { if (stateHandles.isEmpty()) { return; @@ -457,8 +472,17 @@ private void restoreOldSavepointKeyedState( Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here."); + KeyedStateHandle keyedStateHandle = stateHandles.iterator().next(); + if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected: " + MigrationKeyGroupStateHandle.class + + ", but found " + keyedStateHandle.getClass()); + } + + MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle; + HashMap> namedStates; - try (FSDataInputStream inputStream = stateHandles.iterator().next().openInputStream()) { + try (FSDataInputStream inputStream = keyGroupStateHandle.openInputStream()) { namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index d8bba59c165ca..8ed5c73e674dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -57,6 +58,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.swing.plaf.basic.BasicSplitPaneUI; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -2346,13 +2348,13 @@ private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean s ChainedStateHandle operatorState = taskStateHandles.getLegacyOperatorState(); List> opStateBackend = taskStateHandles.getManagedOperatorState(); List> opStateRaw = taskStateHandles.getRawOperatorState(); - Collection keyGroupStateBackend = taskStateHandles.getManagedKeyedState(); - Collection keyGroupStateRaw = taskStateHandles.getRawKeyedState(); + Collection keyedStateBackend = taskStateHandles.getManagedKeyedState(); + Collection keyGroupStateRaw = taskStateHandles.getRawKeyedState(); actualOpStatesBackend.add(opStateBackend); actualOpStatesRaw.add(opStateRaw); assertNull(operatorState); - compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyGroupStateBackend); + compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend); compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw); } comparePartitionableState(expectedOpStatesBackend, actualOpStatesBackend); @@ -2690,32 +2692,38 @@ public static void verifyStateRestore( KeyGroupsStateHandle expectPartitionedKeyGroupState = generateKeyGroupState( jobVertexID, keyGroupPartitions.get(i), false); - Collection actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState(); + Collection actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState(); compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), actualPartitionedKeyGroupState); } } public static void compareKeyedState( Collection expectPartitionedKeyGroupState, - Collection actualPartitionedKeyGroupState) throws Exception { + Collection actualPartitionedKeyGroupState) throws Exception { KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next(); - int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getNumberOfKeyGroups(); + int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups(); int actualTotalKeyGroups = 0; - for(KeyGroupsStateHandle keyGroupsStateHandle: actualPartitionedKeyGroupState) { - actualTotalKeyGroups += keyGroupsStateHandle.getNumberOfKeyGroups(); + for(KeyedStateHandle keyedStateHandle: actualPartitionedKeyGroupState) { + assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle); + + actualTotalKeyGroups += keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups(); } assertEquals(expectedTotalKeyGroups, actualTotalKeyGroups); try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream()) { - for (int groupId : expectedHeadOpKeyGroupStateHandle.keyGroups()) { + for (int groupId : expectedHeadOpKeyGroupStateHandle.getKeyGroupRange()) { long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId); inputStream.seek(offset); int expectedKeyGroupState = InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader()); - for (KeyGroupsStateHandle oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) { - if (oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) { + for (KeyedStateHandle oneActualKeyedStateHandle : actualPartitionedKeyGroupState) { + + assertTrue(oneActualKeyedStateHandle instanceof KeyGroupsStateHandle); + + KeyGroupsStateHandle oneActualKeyGroupStateHandle = (KeyGroupsStateHandle) oneActualKeyedStateHandle; + if (oneActualKeyGroupStateHandle.getKeyGroupRange().contains(groupId)) { long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId); try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) { actualInputStream.seek(actualOffset); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 18b07eb0f9e6d..7e0a7c1859cc6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -68,7 +69,7 @@ public void testSetState() { final ChainedStateHandle serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new SerializableObject()); KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0); List testStates = Collections.singletonList(new SerializableObject()); - final KeyGroupsStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); + final KeyedStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); final JobID jid = new JobID(); final JobVertexID statefulId = new JobVertexID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java index 6ab86206a3034..1ecb2e3a0c0fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -58,6 +59,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @SuppressWarnings("deprecation") public class MigrationV0ToV1Test { @@ -154,9 +156,15 @@ public void testSavepointMigrationV0ToV1() throws Exception { } //check keyed state - KeyGroupsStateHandle keyGroupsStateHandle = subtaskState.getManagedKeyedState(); + KeyedStateHandle keyedStateHandle = subtaskState.getManagedKeyedState(); + if (t % 3 != 0) { - assertEquals(1, keyGroupsStateHandle.getNumberOfKeyGroups()); + + assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle); + + KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; + + assertEquals(1, keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup()); ByteStreamStateHandle stateHandle = @@ -172,7 +180,7 @@ public void testSavepointMigrationV0ToV1() throws Exception { assertEquals(p, data[1]); } } else { - assertEquals(null, keyGroupsStateHandle); + assertEquals(null, keyedStateHandle); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java index 0c4ed742406bf..cee0b024f0c42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyedStateCheckpointOutputStreamTest.java @@ -135,7 +135,7 @@ public void testReadWriteMissingKeyGroups() throws Exception { int count = 0; try (FSDataInputStream in = fullHandle.openInputStream()) { DataInputView div = new DataInputViewStreamWrapper(in); - for (int kg : fullHandle.keyGroups()) { + for (int kg : fullHandle.getKeyGroupRange()) { long off = fullHandle.getOffsetForKeyGroup(kg); if (off >= 0) { in.seek(off); @@ -152,7 +152,7 @@ private static void verifyRead(KeyGroupsStateHandle fullHandle, KeyGroupRange ke int count = 0; try (FSDataInputStream in = fullHandle.openInputStream()) { DataInputView div = new DataInputViewStreamWrapper(in); - for (int kg : fullHandle.keyGroups()) { + for (int kg : fullHandle.getKeyGroupRange()) { long off = fullHandle.getOffsetForKeyGroup(kg); in.seek(off); Assert.assertEquals(kg, div.readInt()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 22bb7153dd3ac..ccc1eaee3ea0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -143,13 +143,13 @@ protected AbstractKeyedStateBackend createKeyedBackend( env.getTaskKvStateRegistry()); } - protected AbstractKeyedStateBackend restoreKeyedBackend(TypeSerializer keySerializer, KeyGroupsStateHandle state) throws Exception { + protected AbstractKeyedStateBackend restoreKeyedBackend(TypeSerializer keySerializer, KeyedStateHandle state) throws Exception { return restoreKeyedBackend(keySerializer, state, new DummyEnvironment("test", 1, 0)); } protected AbstractKeyedStateBackend restoreKeyedBackend( TypeSerializer keySerializer, - KeyGroupsStateHandle state, + KeyedStateHandle state, Environment env) throws Exception { return restoreKeyedBackend( keySerializer, @@ -163,7 +163,7 @@ protected AbstractKeyedStateBackend restoreKeyedBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - List state, + List state, Environment env) throws Exception { AbstractKeyedStateBackend backend = getStateBackend().createKeyedStateBackend( @@ -436,7 +436,7 @@ public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exce backend.setCurrentKey(2); state.update(new TestPojo("u2", 2)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -497,7 +497,7 @@ public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws E backend.setCurrentKey(2); state.update(new TestPojo("u2", 2)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -524,7 +524,7 @@ public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws E // update to test state backends that eagerly serialize, such as RocksDB state.update(new TestPojo("u1", 11)); - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -585,7 +585,7 @@ public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throw backend.setCurrentKey(2); state.update(new TestPojo("u2", 2)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -611,7 +611,7 @@ public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throw // update to test state backends that eagerly serialize, such as RocksDB state.update(new TestPojo("u1", 11)); - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot( + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot( 682375462378L, 2, streamFactory, @@ -670,7 +670,7 @@ public void testValueState() throws Exception { assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -681,7 +681,7 @@ public void testValueState() throws Exception { state.update("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -880,7 +880,7 @@ public void testMultipleValueStates() throws Exception { assertEquals(13, (int) state2.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend( @@ -952,7 +952,7 @@ public void testValueStateNullUpdate() throws Exception { assertEquals(42L, (long) state.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); @@ -997,7 +997,7 @@ public void testListState() throws Exception { assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1008,7 +1008,7 @@ public void testListState() throws Exception { state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1091,7 +1091,7 @@ public void testReducingState() throws Exception { assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1102,7 +1102,7 @@ public void testReducingState() throws Exception { state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1188,7 +1188,7 @@ public void testFoldingState() throws Exception { assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1200,7 +1200,7 @@ public void testFoldingState() throws Exception { state.add(103); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1287,7 +1287,7 @@ public void testMapState() throws Exception { getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -1299,7 +1299,7 @@ public void testMapState() throws Exception { state.putAll(new HashMap() {{ put(1031, "1031"); put(1032, "1032"); }}); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1606,13 +1606,13 @@ public void testKeyGroupSnapshotRestore() throws Exception { state.update("ShouldBeInSecondHalf"); - KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); - List firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( + List firstHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles( Collections.singletonList(snapshot), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 0)); - List secondHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( + List secondHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles( Collections.singletonList(snapshot), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 1)); @@ -1672,7 +1672,7 @@ public void testValueStateRestoreWithWrongSerializers() { state.update("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1723,7 +1723,7 @@ public void testListStateRestoreWithWrongSerializers() { state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1776,7 +1776,7 @@ public void testReducingStateRestoreWithWrongSerializers() { state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1827,7 +1827,7 @@ public void testMapStateRestoreWithWrongSerializers() { state.put("2", "Second"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -2093,7 +2093,7 @@ public void testQueryableStateRegistration() throws Exception { eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); - KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); @@ -2124,7 +2124,7 @@ public void testEmptyStateCheckpointing() { ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); // draw a snapshot - KeyGroupsStateHandle snapshot = + KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); assertNull(snapshot); backend.dispose(); @@ -2152,7 +2152,7 @@ public void testAsyncSnapshot() throws Exception { streamFactory.setWaiterLatch(waiter); AbstractKeyedStateBackend backend = null; - KeyGroupsStateHandle stateHandle = null; + KeyedStateHandle stateHandle = null; try { backend = createKeyedBackend(IntSerializer.INSTANCE); @@ -2167,7 +2167,7 @@ public void testAsyncSnapshot() throws Exception { valueState.update(i); } - RunnableFuture snapshot = + RunnableFuture snapshot = backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); Thread runner = new Thread(snapshot); runner.start(); @@ -2249,7 +2249,7 @@ public void testAsyncSnapshotCancellation() throws Exception { valueState.update(i); } - RunnableFuture snapshot = + RunnableFuture snapshot = backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); Thread runner = new Thread(snapshot); @@ -2367,7 +2367,7 @@ private static Map getSerializedMap( } } - private KeyGroupsStateHandle runSnapshot(RunnableFuture snapshotRunnableFuture) throws Exception { + private KeyedStateHandle runSnapshot(RunnableFuture snapshotRunnableFuture) throws Exception { if(!snapshotRunnableFuture.isDone()) { Thread runner = new Thread(snapshotRunnableFuture); runner.start(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java index da0666a95324c..3754d6399f882 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -63,7 +64,7 @@ public void testRestore1_2ToMaster() throws Exception { try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) { stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader()); } - keyedBackend.restore(Collections.singleton(stateHandle)); + keyedBackend.restore(Collections.singleton(stateHandle)); final ListStateDescriptor stateDescr = new ListStateDescriptor<>("my-state", Long.class); stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index e40a59b1988e6..7f209ca46826e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -47,9 +47,9 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupsList; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; @@ -76,8 +76,6 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.flink.util.Preconditions.checkArgument; - /** * Base class for all stream operators. Operators that contain a user function should extend the class * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). @@ -198,7 +196,7 @@ public MetricGroup getMetricGroup() { @Override public final void initializeState(OperatorStateHandles stateHandles) throws Exception { - Collection keyedStateHandlesRaw = null; + Collection keyedStateHandlesRaw = null; Collection operatorStateHandlesRaw = null; Collection operatorStateHandlesBackend = null; @@ -473,12 +471,13 @@ public void initializeState(StateInitializationContext context) throws Exception // and then initialize the timer services for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) { int keyGroupIdx = streamProvider.getKeyGroupId(); - checkArgument(localKeyGroupRange.contains(keyGroupIdx), - "Key Group " + keyGroupIdx + " does not belong to the local range."); - timeServiceManager.restoreStateForKeyGroup( - new DataInputViewStreamWrapper(streamProvider.getStream()), - keyGroupIdx, getUserCodeClassloader()); + if (localKeyGroupRange.contains(keyGroupIdx)) { + timeServiceManager.restoreStateForKeyGroup( + new DataInputViewStreamWrapper(streamProvider.getStream()), + keyGroupIdx, getUserCodeClassloader() + ); + } } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index b1c94cb5ebe65..96c91e8ecdad6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.util.ExceptionUtils; @@ -30,7 +31,7 @@ */ public class OperatorSnapshotResult { - private RunnableFuture keyedStateManagedFuture; + private RunnableFuture keyedStateManagedFuture; private RunnableFuture keyedStateRawFuture; private RunnableFuture operatorStateManagedFuture; private RunnableFuture operatorStateRawFuture; @@ -40,7 +41,7 @@ public OperatorSnapshotResult() { } public OperatorSnapshotResult( - RunnableFuture keyedStateManagedFuture, + RunnableFuture keyedStateManagedFuture, RunnableFuture keyedStateRawFuture, RunnableFuture operatorStateManagedFuture, RunnableFuture operatorStateRawFuture) { @@ -50,11 +51,11 @@ public OperatorSnapshotResult( this.operatorStateRawFuture = operatorStateRawFuture; } - public RunnableFuture getKeyedStateManagedFuture() { + public RunnableFuture getKeyedStateManagedFuture() { return keyedStateManagedFuture; } - public void setKeyedStateManagedFuture(RunnableFuture keyedStateManagedFuture) { + public void setKeyedStateManagedFuture(RunnableFuture keyedStateManagedFuture) { this.keyedStateManagedFuture = keyedStateManagedFuture; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java index 7abf8d99187d1..30d07b73bf7eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java @@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; @@ -42,16 +42,16 @@ public class OperatorStateHandles { private final StreamStateHandle legacyOperatorState; - private final Collection managedKeyedState; - private final Collection rawKeyedState; + private final Collection managedKeyedState; + private final Collection rawKeyedState; private final Collection managedOperatorState; private final Collection rawOperatorState; public OperatorStateHandles( int operatorChainIndex, StreamStateHandle legacyOperatorState, - Collection managedKeyedState, - Collection rawKeyedState, + Collection managedKeyedState, + Collection rawKeyedState, Collection managedOperatorState, Collection rawOperatorState) { @@ -83,11 +83,11 @@ public StreamStateHandle getLegacyOperatorState() { return legacyOperatorState; } - public Collection getManagedKeyedState() { + public Collection getManagedKeyedState() { return managedKeyedState; } - public Collection getRawKeyedState() { + public Collection getRawKeyedState() { return rawKeyedState; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 76b2b98afe0e8..e55c84abfceb6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackend; @@ -849,7 +850,7 @@ private static final class AsyncCheckpointRunnable implements Runnable, Closeabl private final List snapshotInProgressList; - private RunnableFuture futureKeyedBackendStateHandles; + private RunnableFuture futureKeyedBackendStateHandles; private RunnableFuture futureKeyedStreamStateHandles; private List nonPartitionedStateHandles; @@ -892,7 +893,7 @@ private static final class AsyncCheckpointRunnable implements Runnable, Closeabl public void run() { try { // Keyed state handle future, currently only one (the head) operator can have this - KeyGroupsStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); + KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); List operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size()); @@ -987,7 +988,7 @@ private SubtaskState createSubtaskStateFromSnapshotStateHandles( ChainedStateHandle chainedNonPartitionedOperatorsState, ChainedStateHandle chainedOperatorStateBackend, ChainedStateHandle chainedOperatorStateStream, - KeyGroupsStateHandle keyedStateHandleBackend, + KeyedStateHandle keyedStateHandleBackend, KeyGroupsStateHandle keyedStateHandleStream) { boolean hasAnyState = keyedStateHandleBackend != null diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java index 490df52d868c1..8f34c072e81e6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -41,8 +42,8 @@ public void testCancelAndCleanup() throws Exception { operatorSnapshotResult.cancel(); - KeyGroupsStateHandle keyedManagedStateHandle = mock(KeyGroupsStateHandle.class); - RunnableFuture keyedStateManagedFuture = mock(RunnableFuture.class); + KeyedStateHandle keyedManagedStateHandle = mock(KeyedStateHandle.class); + RunnableFuture keyedStateManagedFuture = mock(RunnableFuture.class); when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index 963c42c791b1a..8e0edfcab9d69 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StatePartitionStreamProvider; @@ -75,7 +76,7 @@ public void setUp() throws Exception { ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(64); - List keyGroupsStateHandles = new ArrayList<>(NUM_HANDLES); + List keyedStateHandles = new ArrayList<>(NUM_HANDLES); int prev = 0; for (int i = 0; i < NUM_HANDLES; ++i) { out.reset(); @@ -91,10 +92,10 @@ public void setUp() throws Exception { ++writtenKeyGroups; } - KeyGroupsStateHandle handle = + KeyedStateHandle handle = new KeyGroupsStateHandle(offsets, new ByteStateHandleCloseChecking("kg-" + i, out.toByteArray())); - keyGroupsStateHandles.add(handle); + keyedStateHandles.add(handle); } List operatorStateHandles = new ArrayList<>(NUM_HANDLES); @@ -125,7 +126,7 @@ public void setUp() throws Exception { true, stateStore, mock(KeyedStateStore.class), - keyGroupsStateHandles, + keyedStateHandles, operatorStateHandles, closableRegistry); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 58cfefd556c0a..44352476e8657 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StreamStateHandle; @@ -186,8 +187,8 @@ private static Task createTask( ChainedStateHandle operatorState = null; - List keyGroupStateFromBackend = Collections.emptyList(); - List keyGroupStateFromStream = Collections.emptyList(); + List keyedStateFromBackend = Collections.emptyList(); + List keyedStateFromStream = Collections.emptyList(); List> operatorStateBackend = Collections.emptyList(); List> operatorStateStream = Collections.emptyList(); @@ -201,8 +202,8 @@ private static Task createTask( Collection operatorStateHandles = Collections.singletonList(new OperatorStateHandle(operatorStateMetadata, state)); - List keyGroupsStateHandles = - Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); + List keyedStateHandles = + Collections.singletonList(new KeyGroupsStateHandle(keyGroupRangeOffsets, state)); switch (mode) { case OPERATOR_MANAGED: @@ -212,10 +213,10 @@ private static Task createTask( operatorStateStream = Collections.singletonList(operatorStateHandles); break; case KEYED_MANAGED: - keyGroupStateFromBackend = keyGroupsStateHandles; + keyedStateFromBackend = keyedStateHandles; break; case KEYED_RAW: - keyGroupStateFromStream = keyGroupsStateHandles; + keyedStateFromStream = keyedStateHandles; break; case LEGACY: operatorState = new ChainedStateHandle<>(Collections.singletonList(state)); @@ -228,8 +229,8 @@ private static Task createTask( operatorState, operatorStateBackend, operatorStateStream, - keyGroupStateFromBackend, - keyGroupStateFromStream); + keyedStateFromBackend, + keyedStateFromStream); JobInformation jobInformation = new JobInformation( new JobID(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index d7e3d6cbde700..065ef6fdbad16 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackendFactory; @@ -458,7 +459,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { StreamOperator streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class); + KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class); OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); @@ -574,7 +575,7 @@ public SubtaskState answer(InvocationOnMock invocation) throws Throwable { StreamOperator streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); - KeyGroupsStateHandle managedKeyedStateHandle = mock(KeyGroupsStateHandle.class); + KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class); OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 945103c49ead4..912d579ce764e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; @@ -318,7 +319,7 @@ public void initializeStateFromLegacyCheckpoint(String checkpointFilename) throw StreamStateHandle stateHandle = SavepointV0Serializer.convertOperatorAndFunctionState(state); - List keyGroupStatesList = new ArrayList<>(); + List keyGroupStatesList = new ArrayList<>(); if (state.getKvStates() != null) { KeyGroupsStateHandle keyedStateHandle = SavepointV0Serializer.convertKeyedBackendState( state.getKvStates(), @@ -331,7 +332,7 @@ public void initializeStateFromLegacyCheckpoint(String checkpointFilename) throw initializeState(new OperatorStateHandles(0, stateHandle, keyGroupStatesList, - Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); } @@ -364,16 +365,16 @@ public void initializeState(OperatorStateHandles operatorStateHandles) throws Ex KeyGroupRange localKeyGroupRange = keyGroupPartitions.get(subtaskIndex); - List localManagedKeyGroupState = null; + List localManagedKeyGroupState = null; if (operatorStateHandles.getManagedKeyedState() != null) { - localManagedKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles( + localManagedKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( operatorStateHandles.getManagedKeyedState(), localKeyGroupRange); } - List localRawKeyGroupState = null; + List localRawKeyGroupState = null; if (operatorStateHandles.getRawKeyedState() != null) { - localRawKeyGroupState = StateAssignmentOperation.getKeyGroupsStateHandles( + localRawKeyGroupState = StateAssignmentOperation.getKeyedStateHandles( operatorStateHandles.getRawKeyedState(), localKeyGroupRange); } @@ -442,15 +443,15 @@ public static OperatorStateHandles repackageState(OperatorStateHandles... handle List mergedManagedOperatorState = new ArrayList<>(handles.length); List mergedRawOperatorState = new ArrayList<>(handles.length); - List mergedManagedKeyedState = new ArrayList<>(handles.length); - List mergedRawKeyedState = new ArrayList<>(handles.length); + List mergedManagedKeyedState = new ArrayList<>(handles.length); + List mergedRawKeyedState = new ArrayList<>(handles.length); for (OperatorStateHandles handle: handles) { Collection managedOperatorState = handle.getManagedOperatorState(); Collection rawOperatorState = handle.getRawOperatorState(); - Collection managedKeyedState = handle.getManagedKeyedState(); - Collection rawKeyedState = handle.getRawKeyedState(); + Collection managedKeyedState = handle.getManagedKeyedState(); + Collection rawKeyedState = handle.getRawKeyedState(); if (managedOperatorState != null) { mergedManagedOperatorState.addAll(managedOperatorState); @@ -502,8 +503,8 @@ public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws E timestamp, CheckpointOptions.forFullCheckpoint()); - KeyGroupsStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture()); - KeyGroupsStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture()); + KeyedStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture()); + KeyedStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture()); OperatorStateHandle opManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateManagedFuture()); OperatorStateHandle opRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getOperatorStateRawFuture()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index d45ae2126985a..d9c73874def4d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -65,7 +66,7 @@ public class KeyedOneInputStreamOperatorTestHarness // when we restore we keep the state here so that we can call restore // when the operator requests the keyed state backend - private List restoredKeyedState = null; + private List restoredKeyedState = null; public KeyedOneInputStreamOperatorTestHarness( OneInputStreamOperator operator, @@ -144,7 +145,7 @@ public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throw } if (keyedStateBackend != null) { - RunnableFuture keyedSnapshotRunnable = keyedStateBackend.snapshot( + RunnableFuture keyedSnapshotRunnable = keyedStateBackend.snapshot( checkpointId, timestamp, streamFactory, @@ -177,14 +178,14 @@ public void restore(StreamStateHandle snapshot) throws Exception { byte keyedStatePresent = (byte) inStream.read(); if (keyedStatePresent == 1) { ObjectInputStream ois = new ObjectInputStream(inStream); - this.restoredKeyedState = Collections.singletonList((KeyGroupsStateHandle) ois.readObject()); + this.restoredKeyedState = Collections.singletonList((KeyedStateHandle) ois.readObject()); } } } - private static boolean hasMigrationHandles(Collection allKeyGroupsHandles) { - for (KeyGroupsStateHandle handle : allKeyGroupsHandles) { + private static boolean hasMigrationHandles(Collection allKeyGroupsHandles) { + for (KeyedStateHandle handle : allKeyGroupsHandles) { if (handle instanceof Migration) { return true; } @@ -225,17 +226,17 @@ public void initializeState(OperatorStateHandles operatorStateHandles) throws Ex keyGroupPartitions.get(subtaskIndex); restoredKeyedState = null; - Collection managedKeyedState = operatorStateHandles.getManagedKeyedState(); + Collection managedKeyedState = operatorStateHandles.getManagedKeyedState(); if (managedKeyedState != null) { // if we have migration handles, don't reshuffle state and preserve // the migration tag if (hasMigrationHandles(managedKeyedState)) { - List result = new ArrayList<>(managedKeyedState.size()); + List result = new ArrayList<>(managedKeyedState.size()); result.addAll(managedKeyedState); restoredKeyedState = result; } else { - restoredKeyedState = StateAssignmentOperation.getKeyGroupsStateHandles( + restoredKeyedState = StateAssignmentOperation.getKeyedStateHandles( managedKeyedState, localKeyGroupRange); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index 8e76f7065f4e8..41a083a69fcb7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.mockito.invocation.InvocationOnMock; @@ -50,7 +51,7 @@ public class KeyedTwoInputStreamOperatorTestHarness // when we restore we keep the state here so that we can call restore // when the operator requests the keyed state backend - private Collection restoredKeyedState = null; + private Collection restoredKeyedState = null; public KeyedTwoInputStreamOperatorTestHarness( TwoInputStreamOperator operator, From 7aea3b7fadc7974770554d4ce6036bad12b949f5 Mon Sep 17 00:00:00 2001 From: "xiaogang.sxg" Date: Tue, 28 Mar 2017 13:36:06 +0800 Subject: [PATCH 2/2] Change the type returned by the future of raw keyed state to KeyedStateHandle --- .../streaming/state/RocksDBKeyedStateBackend.java | 7 +++---- .../runtime/checkpoint/StateAssignmentOperation.java | 4 +++- .../apache/flink/runtime/state/KeyedStateHandle.java | 4 ++-- .../state/StateSnapshotContextSynchronousImpl.java | 12 +++++++----- .../runtime/state/heap/HeapKeyedStateBackend.java | 6 ++---- .../checkpoint/CheckpointCoordinatorTest.java | 1 - .../api/operators/OperatorSnapshotResult.java | 9 ++++----- .../flink/streaming/runtime/tasks/StreamTask.java | 7 +++---- .../api/operators/AbstractStreamOperatorTest.java | 10 +++++----- .../api/operators/OperatorSnapshotResultTest.java | 5 ++--- .../streaming/runtime/tasks/StreamTaskTest.java | 9 ++++----- 11 files changed, 35 insertions(+), 39 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 89e4153590bab..0407070cc1cf7 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -771,10 +771,9 @@ private void restoreKVStateData() throws IOException, RocksDBException { for (Tuple2 keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroup = keyGroupOffset.f0; - // Skip those key groups that do not belong to the backend - if (!rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup)) { - continue; - } + // Check that restored key groups all belong to the backend + Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), + "The key group must belong to the backend"); long offset = keyGroupOffset.f1; //not empty key-group? diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 310983e280f9a..ac70e1aefc088 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -300,7 +300,9 @@ public static List getKeyedStateHandles( for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(subtaskKeyGroupRange); - subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); + if (intersectedKeyedStateHandle != null) { + subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); + } } return subtaskKeyedStateHandles; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java index b4fa91f139c55..dc9c97df5053a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java @@ -31,8 +31,8 @@ public interface KeyedStateHandle extends StateObject { KeyGroupRange getKeyGroupRange(); /** - * Returns a keyed state handle which contains the states for the given - * key groups. + * Returns a state over a range that is the intersection between this + * handle's key-group range and the provided key-group range. * * @param keyGroupRange The key group range to intersect with */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java index 96edccbb103d1..5db01386d6045 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java @@ -109,15 +109,17 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex return operatorStateCheckpointOutputStream; } - public RunnableFuture getKeyedStateStreamFuture() throws IOException { - return closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream); + public RunnableFuture getKeyedStateStreamFuture() throws IOException { + KeyGroupsStateHandle keyGroupsStateHandle = closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream); + return new DoneFuture(keyGroupsStateHandle); } public RunnableFuture getOperatorStateStreamFuture() throws IOException { - return closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream); + OperatorStateHandle operatorStateHandle = closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream); + return new DoneFuture<>(operatorStateHandle); } - private RunnableFuture closeAndUnregisterStreamToObtainStateHandle( + private T closeAndUnregisterStreamToObtainStateHandle( NonClosingCheckpointOutputStream stream) throws IOException { if (null == stream) { return null; @@ -126,7 +128,7 @@ private RunnableFuture closeAndUnregisterStream closableRegistry.unregisterClosable(stream.getDelegate()); // for now we only support synchronous writing - return new DoneFuture<>(stream.closeAndGetHandle()); + return stream.closeAndGetHandle(); } private void closeAndUnregisterStream(NonClosingCheckpointOutputStream stream) throws IOException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 1c06c2c2bc3c5..a332d7dba1196 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -425,10 +425,8 @@ private void restorePartitionedState(Collection state) throws int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; - // Skip those key groups that don't belong to the backend. - if (!keyGroupRange.contains(keyGroupIndex)) { - continue; - } + // Check that restored key groups all belong to the backend. + Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The key group must belong to the backend."); fsDataInputStream.seek(offset); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 8ed5c73e674dc..117c70d3139fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -58,7 +58,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import javax.swing.plaf.basic.BasicSplitPaneUI; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 96c91e8ecdad6..8aa76a5f852f7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; @@ -32,7 +31,7 @@ public class OperatorSnapshotResult { private RunnableFuture keyedStateManagedFuture; - private RunnableFuture keyedStateRawFuture; + private RunnableFuture keyedStateRawFuture; private RunnableFuture operatorStateManagedFuture; private RunnableFuture operatorStateRawFuture; @@ -42,7 +41,7 @@ public OperatorSnapshotResult() { public OperatorSnapshotResult( RunnableFuture keyedStateManagedFuture, - RunnableFuture keyedStateRawFuture, + RunnableFuture keyedStateRawFuture, RunnableFuture operatorStateManagedFuture, RunnableFuture operatorStateRawFuture) { this.keyedStateManagedFuture = keyedStateManagedFuture; @@ -59,11 +58,11 @@ public void setKeyedStateManagedFuture(RunnableFuture keyedSta this.keyedStateManagedFuture = keyedStateManagedFuture; } - public RunnableFuture getKeyedStateRawFuture() { + public RunnableFuture getKeyedStateRawFuture() { return keyedStateRawFuture; } - public void setKeyedStateRawFuture(RunnableFuture keyedStateRawFuture) { + public void setKeyedStateRawFuture(RunnableFuture keyedStateRawFuture) { this.keyedStateRawFuture = keyedStateRawFuture; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e55c84abfceb6..11e8e0d2c8fbd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -851,7 +850,7 @@ private static final class AsyncCheckpointRunnable implements Runnable, Closeabl private final List snapshotInProgressList; private RunnableFuture futureKeyedBackendStateHandles; - private RunnableFuture futureKeyedStreamStateHandles; + private RunnableFuture futureKeyedStreamStateHandles; private List nonPartitionedStateHandles; @@ -894,7 +893,7 @@ public void run() { try { // Keyed state handle future, currently only one (the head) operator can have this KeyedStateHandle keyedStateHandleBackend = FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles); - KeyGroupsStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); + KeyedStateHandle keyedStateHandleStream = FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles); List operatorStatesBackend = new ArrayList<>(snapshotInProgressList.size()); List operatorStatesStream = new ArrayList<>(snapshotInProgressList.size()); @@ -989,7 +988,7 @@ private SubtaskState createSubtaskStateFromSnapshotStateHandles( ChainedStateHandle chainedOperatorStateBackend, ChainedStateHandle chainedOperatorStateStream, KeyedStateHandle keyedStateHandleBackend, - KeyGroupsStateHandle keyedStateHandleStream) { + KeyedStateHandle keyedStateHandleStream) { boolean hasAnyState = keyedStateHandleBackend != null || keyedStateHandleStream != null diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index eeee8dcea62e0..8f42c1ae59cab 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -51,7 +51,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; @@ -559,11 +559,11 @@ public void testFailingBackendSnapshotMethod() throws Exception { final CloseableRegistry closeableRegistry = new CloseableRegistry(); - RunnableFuture futureKeyGroupStateHandle = mock(RunnableFuture.class); + RunnableFuture futureKeyedStateHandle = mock(RunnableFuture.class); RunnableFuture futureOperatorStateHandle = mock(RunnableFuture.class); StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); - when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle); + when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle); when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle); OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult()); @@ -609,9 +609,9 @@ public void testFailingBackendSnapshotMethod() throws Exception { verify(context).close(); verify(operatorSnapshotResult).cancel(); - verify(futureKeyGroupStateHandle).cancel(anyBoolean()); + verify(futureKeyedStateHandle).cancel(anyBoolean()); verify(futureOperatorStateHandle).cancel(anyBoolean()); - verify(futureKeyGroupStateHandle).cancel(anyBoolean()); + verify(futureKeyedStateHandle).cancel(anyBoolean()); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java index 8f34c072e81e6..f57eed1fdc309 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.util.TestLogger; @@ -46,8 +45,8 @@ public void testCancelAndCleanup() throws Exception { RunnableFuture keyedStateManagedFuture = mock(RunnableFuture.class); when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); - KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class); - RunnableFuture keyedStateRawFuture = mock(RunnableFuture.class); + KeyedStateHandle keyedRawStateHandle = mock(KeyedStateHandle.class); + RunnableFuture keyedStateRawFuture = mock(RunnableFuture.class); when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle); OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 065ef6fdbad16..f34522b9831ac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -61,7 +61,6 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -460,7 +459,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { StreamOperator streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); - KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class); + KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class); @@ -564,8 +563,8 @@ public SubtaskState answer(InvocationOnMock invocation) throws Throwable { (ChainedStateHandle)invocation.getArguments()[0], (ChainedStateHandle)invocation.getArguments()[1], (ChainedStateHandle)invocation.getArguments()[2], - (KeyGroupsStateHandle)invocation.getArguments()[3], - (KeyGroupsStateHandle)invocation.getArguments()[4]); + (KeyedStateHandle)invocation.getArguments()[3], + (KeyedStateHandle)invocation.getArguments()[4]); } }); @@ -576,7 +575,7 @@ public SubtaskState answer(InvocationOnMock invocation) throws Throwable { StreamOperator streamOperator = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); KeyedStateHandle managedKeyedStateHandle = mock(KeyedStateHandle.class); - KeyGroupsStateHandle rawKeyedStateHandle = mock(KeyGroupsStateHandle.class); + KeyedStateHandle rawKeyedStateHandle = mock(KeyedStateHandle.class); OperatorStateHandle managedOperatorStateHandle = mock(OperatorStateHandle.class); OperatorStateHandle rawOperatorStateHandle = mock(OperatorStateHandle.class);