From 20ef549c21115c6548a23cb068075494add0cc04 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Fri, 10 Mar 2017 17:55:45 +0100 Subject: [PATCH] [FLINK-5985] Report no task states for stateless tasks on checkpointing --- .../state/RocksDBKeyedStateBackend.java | 9 ++- .../state/DefaultOperatorStateBackend.java | 2 +- .../flink/runtime/state/DoneFuture.java | 16 +++- .../state/ManagedInitializationContext.java | 3 +- .../flink/runtime/state/Snapshotable.java | 5 +- .../state/heap/HeapKeyedStateBackend.java | 8 +- .../checkpoint/PendingCheckpointTest.java | 35 ++++++++- .../state/OperatorStateBackendTest.java | 19 ++++- .../runtime/state/StateBackendTestBase.java | 48 +++++------- .../flink/runtime/state/StateUtilTest.java | 2 +- .../api/operators/OperatorSnapshotResult.java | 12 +++ .../streaming/runtime/tasks/StreamTask.java | 32 +++++++- .../runtime/tasks/StreamTaskTest.java | 74 ++++++++++++++++++- 13 files changed, 213 insertions(+), 52 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index eb926c0bfa7e99..11ca715181f30f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -238,6 +238,10 @@ public int getKeyGroupPrefixBytes() { return keyGroupPrefixBytes; } + private boolean hasRegisteredState() { + return !kvStateInformation.isEmpty(); + } + /** * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always @@ -265,13 +269,12 @@ public RunnableFuture snapshot( if (db != null) { - if (kvStateInformation.isEmpty()) { + if (!hasRegisteredState()) { if (LOG.isDebugEnabled()) { LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp + " . Returning null."); } - - return new DoneFuture<>(null); + return DoneFuture.nullValue(); } snapshotOperation.takeDBSnapShot(checkpointId, timestamp); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 8dcf49e472160a..2497a00e1e7cf1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -161,7 +161,7 @@ public RunnableFuture snapshot( CheckpointOptions checkpointOptions) throws Exception { if (registeredStates.isEmpty()) { - return new DoneFuture<>(null); + return DoneFuture.nullValue(); } List> metaInfoList = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java index 777ab691e180c6..d2d808dc660015 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DoneFuture.java @@ -30,10 +30,13 @@ * @param The type of object in this {@code Future}. */ public class DoneFuture implements RunnableFuture { - private final T keyGroupsStateHandle; - public DoneFuture(T keyGroupsStateHandle) { - this.keyGroupsStateHandle = keyGroupsStateHandle; + private static final DoneFuture NULL_FUTURE = new DoneFuture(null); + + private final T payload; + + public DoneFuture(T payload) { + this.payload = payload; } @Override @@ -53,7 +56,7 @@ public boolean isDone() { @Override public T get() throws InterruptedException, ExecutionException { - return keyGroupsStateHandle; + return payload; } @Override @@ -67,4 +70,9 @@ public T get( public void run() { } + + @SuppressWarnings("unchecked") + public static DoneFuture nullValue() { + return (DoneFuture) NULL_FUTURE; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java index 5255c4330a578b..522aca6f01538e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java @@ -36,7 +36,8 @@ public interface ManagedInitializationContext { /** - * Returns true, if some managed state was restored from the snapshot of a previous execution. + * Returns true, if state was restored from the snapshot of a previous execution. This returns always false for + * stateless tasks. */ boolean isRestored(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java index 0d92b463207f7e..c7e62f09be594a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; + import java.util.Collection; import java.util.concurrent.RunnableFuture; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; /** - * Interface for operations that can perform snapshots of their state. + * Interface for operators that can perform snapshots of their state. * * @param Generic type of the state object that is created as handle to snapshots. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 4a5455a11ff62a..1f0d87ef0863ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -144,6 +144,10 @@ private StateTable tryRegisterStateTable( return stateTable; } + private boolean hasRegisteredState() { + return !stateTables.isEmpty(); + } + @Override public InternalValueState createValueState( TypeSerializer namespaceSerializer, @@ -219,8 +223,8 @@ public RunnableFuture snapshot( CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { - if (stateTables.isEmpty()) { - return new DoneFuture<>(null); + if (!hasRegisteredState()) { + return DoneFuture.nullValue(); } try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 6f04f39b37fa8d..105d10e3247a93 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -24,11 +24,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; - +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import org.mockito.Mockito; import java.io.File; @@ -48,6 +47,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; public class PendingCheckpointTest { @@ -55,7 +55,10 @@ public class PendingCheckpointTest { private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID(); static { - ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class)); + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getMaxParallelism()).thenReturn(128); + when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1); + ACK_TASKS.put(ATTEMPT_ID, vertex); } @Rule @@ -287,6 +290,32 @@ public void testPendingCheckpointStatsCallbacks() throws Exception { } } + /** + * FLINK-5985 + *

+ * Ensures that subtasks that acknowledge their state as 'null' are considered stateless. This means that they + * should not appear in the task states map of the checkpoint. + */ + @Test + public void testNullSubtaskStateLeadsToStatelessTask() throws Exception { + PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null); + pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class)); + Assert.assertTrue(pending.getTaskStates().isEmpty()); + } + + /** + * FLINK-5985 + *

+ * This tests checks the inverse of {@link #testNullSubtaskStateLeadsToStatelessTask()}. We want to test that + * for subtasks that acknowledge some state are given an entry in the task states of the checkpoint. + */ + @Test + public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception { + PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null); + pending.acknowledgeTask(ATTEMPT_ID, mock(SubtaskState.class), mock(CheckpointMetrics.class)); + Assert.assertFalse(pending.getTaskStates().isEmpty()); + } + // ------------------------------------------------------------------------ private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 94df5246134f9b..d883d6e0224a6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -23,11 +23,14 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.FutureUtil; +import org.junit.Assert; import org.junit.Test; import java.io.Serializable; import java.util.Collections; import java.util.Iterator; +import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -143,6 +146,19 @@ public void testRegisterStates() throws Exception { } } + @Test + public void testSnapshotEmpty() throws Exception { + DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); + CheckpointStreamFactory streamFactory = + abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); + + RunnableFuture snapshot = + operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); + + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); + Assert.assertNull(stateHandle); + } + @Test public void testSnapshotRestore() throws Exception { DefaultOperatorStateBackend operatorStateBackend = createNewOperatorStateBackend(); @@ -166,7 +182,8 @@ public void testSnapshotRestore() throws Exception { listState3.add(20); CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); - OperatorStateHandle stateHandle = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()).get(); + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet( + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 40ac72c212501d..d2f6757412966e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.state.heap.StateTable; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.types.IntValue; +import org.apache.flink.util.FutureUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -67,7 +68,6 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RunnableFuture; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; @@ -192,7 +192,7 @@ public void testValueState() throws Exception { assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -203,7 +203,7 @@ public void testValueState() throws Exception { state.update("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -404,7 +404,7 @@ public void testMultipleValueStates() throws Exception { assertEquals(13, (int) state2.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend( @@ -477,7 +477,7 @@ public void testValueStateNullUpdate() throws Exception { assertEquals(42L, (long) state.value()); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); @@ -522,7 +522,7 @@ public void testListState() { assertEquals("1", joiner.join(getSerializedList(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -533,7 +533,7 @@ public void testListState() { state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -621,7 +621,7 @@ public void testReducingState() { assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -632,7 +632,7 @@ public void testReducingState() { state.add("u3"); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -723,7 +723,7 @@ public void testFoldingState() { assertEquals("Fold-Initial:,1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -735,7 +735,7 @@ public void testFoldingState() { state.add(103); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -827,7 +827,7 @@ public void testMapState() { getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); // make some more modifications backend.setCurrentKey(1); @@ -839,7 +839,7 @@ public void testMapState() { state.putAll(new HashMap() {{ put(1031, "1031"); put(1032, "1032"); }}); // draw another snapshot - KeyGroupsStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); // validate the original state backend.setCurrentKey(1); @@ -1159,7 +1159,7 @@ public void testKeyGroupSnapshotRestore() throws Exception { state.update("ShouldBeInSecondHalf"); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forFullCheckpoint())); List firstHalfKeyGroupStates = StateAssignmentOperation.getKeyGroupsStateHandles( Collections.singletonList(snapshot), @@ -1226,7 +1226,7 @@ public void testValueStateRestoreWithWrongSerializers() { state.update("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1277,7 +1277,7 @@ public void testListStateRestoreWithWrongSerializers() { state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1330,7 +1330,7 @@ public void testReducingStateRestoreWithWrongSerializers() { state.add("2"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1381,7 +1381,7 @@ public void testMapStateRestoreWithWrongSerializers() { state.put("2", "Second"); // draw a snapshot - KeyGroupsStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); // restore the first snapshot and validate it @@ -1657,7 +1657,7 @@ public void testQueryableStateRegistration() throws Exception { eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); @@ -1688,7 +1688,7 @@ public void testEmptyStateCheckpointing() { ListStateDescriptor kvId = new ListStateDescriptor<>("id", String.class); // draw a snapshot - KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); + KeyGroupsStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forFullCheckpoint())); assertNull(snapshot); backend.dispose(); @@ -1790,12 +1790,4 @@ private static Map getSerializedMap( return KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer); } } - - private KeyGroupsStateHandle runSnapshot(RunnableFuture snapshotRunnableFuture) throws Exception { - if(!snapshotRunnableFuture.isDone()) { - Thread runner = new Thread(snapshotRunnableFuture); - runner.start(); - } - return snapshotRunnableFuture.get(); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java index e59d027e87befe..d6966d0a039bb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java @@ -30,7 +30,7 @@ public class StateUtilTest extends TestLogger { */ @Test public void testDiscardRunnableFutureWithNullValue() throws Exception { - RunnableFuture> stateFuture = new DoneFuture<>(null); + RunnableFuture> stateFuture = DoneFuture.nullValue(); StateUtil.discardStateFuture(stateFuture); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 83697ae089336f..b1c94cb5ebe65c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -121,4 +121,16 @@ public void cancel() throws Exception { throw exception; } } + + public boolean hasKeyedState() { + return keyedStateManagedFuture != null || keyedStateRawFuture != null; + } + + public boolean hasOperatorState() { + return operatorStateManagedFuture != null || operatorStateRawFuture != null; + } + + public boolean hasState() { + return hasKeyedState() || hasOperatorState(); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 1e208eefd7b48c..d4cfb283d6076b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -57,7 +57,6 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -925,14 +924,16 @@ public void run() { ChainedStateHandle chainedOperatorStateStream = new ChainedStateHandle<>(operatorStatesStream); - SubtaskState subtaskState = new SubtaskState( + SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles( chainedNonPartitionedOperatorsState, chainedOperatorStateBackend, chainedOperatorStateStream, keyedStateHandleBackend, keyedStateHandleStream); - if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, + CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + owner.getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), checkpointMetrics, @@ -982,6 +983,31 @@ public void close() { } } + private SubtaskState createSubtaskStateFromSnapshotStateHandles( + ChainedStateHandle chainedNonPartitionedOperatorsState, + ChainedStateHandle chainedOperatorStateBackend, + ChainedStateHandle chainedOperatorStateStream, + KeyGroupsStateHandle keyedStateHandleBackend, + KeyGroupsStateHandle keyedStateHandleStream) { + + boolean hasAnyState = keyedStateHandleBackend != null + || keyedStateHandleStream != null + || !chainedOperatorStateBackend.isEmpty() + || !chainedOperatorStateStream.isEmpty() + || !chainedNonPartitionedOperatorsState.isEmpty(); + + // we signal a stateless task by reporting null, so that there are no attempts to assign empty state to + // stateless tasks on restore. This allows for simple job modifications that only concern stateless without + // the need to assign them uids to match their (always empty) states. + return hasAnyState ? new SubtaskState( + chainedNonPartitionedOperatorsState, + chainedOperatorStateBackend, + chainedOperatorStateStream, + keyedStateHandleBackend, + keyedStateHandleStream) + : null; + } + private void cleanup() throws Exception { if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) { LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 382605189797cb..d7e3d6cbde7002 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks; import akka.dispatch.Futures; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; @@ -88,10 +87,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; - import org.apache.flink.util.TestLogger; +import org.junit.Assert; import org.junit.Test; - import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -111,8 +109,10 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -640,6 +640,74 @@ public SubtaskState answer(InvocationOnMock invocation) throws Throwable { verify(rawOperatorStateHandle).discardState(); } + /** + * FLINK-5985 + * + * This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This + * happens by translating an empty {@link SubtaskState} into reporting 'null' to #acknowledgeCheckpoint. + */ + @Test + public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + TaskInfo mockTaskInfo = mock(TaskInfo.class); + + when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); + when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); + + Environment mockEnvironment = mock(Environment.class); + + // latch blocks until the async checkpoint thread acknowledges + final OneShotLatch checkpointCompletedLatch = new OneShotLatch(); + final List checkpointResult = new ArrayList<>(1); + + // we remember what is acknowledged (expected to be null as our task will snapshot empty states). + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + SubtaskState subtaskState = invocationOnMock.getArgumentAt(2, SubtaskState.class); + checkpointResult.add(subtaskState); + checkpointCompletedLatch.trigger(); + return null; + } + }).when(mockEnvironment).acknowledgeCheckpoint(anyLong(), any(CheckpointMetrics.class), any(SubtaskState.class)); + + when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); + + StreamTask> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + streamTask.setEnvironment(mockEnvironment); + + // mock the operators + StreamOperator statelessOperator = + mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + + // mock the returned empty snapshot result (all state handles are null) + OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult(); + when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))) + .thenReturn(statelessOperatorSnapshotResult); + + // set up the task + StreamOperator[] streamOperators = {statelessOperator}; + OperatorChain> operatorChain = mock(OperatorChain.class); + when(operatorChain.getAllOperators()).thenReturn(streamOperators); + + Whitebox.setInternalState(streamTask, "isRunning", true); + Whitebox.setInternalState(streamTask, "lock", new Object()); + Whitebox.setInternalState(streamTask, "operatorChain", operatorChain); + Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry()); + Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); + Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool()); + + streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forFullCheckpoint()); + checkpointCompletedLatch.await(30, TimeUnit.SECONDS); + streamTask.cancel(); + + // ensure that 'null' was acknowledged as subtask state + Assert.assertNull(checkpointResult.get(0)); + } + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------