From a0edef186c49520e353fe6cdc3321ef208e1bb3b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 1 Dec 2016 13:25:05 +0100 Subject: [PATCH 1/2] [FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation Adds exception handling to the stream operators for the snapshotState method. A failing snapshot operation will trigger the clean up of all so far generated state resources. This will avoid that in case of a failing snapshot operation resources (e.g. files) are left behind. Add test case for OperatorSnapshotResult Add StateSnapshotContextSynchronousImplTest Add AbstractStreamOperator failing snapshot tests --- .../state/RocksDBKeyedStateBackend.java | 12 +- .../state/RocksDBAsyncSnapshotTest.java | 77 ++++++++- .../runtime/state/StateSnapshotContext.java | 7 +- .../StateSnapshotContextSynchronousImpl.java | 37 ++++- .../filesystem/FsCheckpointStreamFactory.java | 67 +++++--- .../FsCheckpointStateOutputStreamTest.java | 73 ++++++++ .../source/ContinuousFileReaderOperator.java | 15 +- .../util/StreamingFunctionUtils.java | 11 +- .../api/operators/AbstractStreamOperator.java | 102 +++++++++--- .../api/operators/OperatorSnapshotResult.java | 56 ++++++- .../operators/async/AsyncWaitOperator.java | 21 ++- .../operators/GenericWriteAheadSink.java | 14 +- .../streaming/runtime/tasks/StreamTask.java | 42 ++++- .../operators/AbstractStreamOperatorTest.java | 157 +++++++++++++++++- .../operators/OperatorSnapshotResultTest.java | 97 +++++------ ...ateSnapshotContextSynchronousImplTest.java | 73 +++++++- 16 files changed, 711 insertions(+), 150 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index b207af62a86c6..6587ca5002ee1 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -284,8 +284,8 @@ public KeyGroupsStateHandle performOperation() throws Exception { } } - LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", asynchronous part) in thread " + - Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms."); + LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); return snapshotOperation.getSnapshotResultStateHandle(); } @@ -346,7 +346,7 @@ static final class RocksDBSnapshotOperation { * @param checkpointId id of the checkpoint for which we take the snapshot * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot */ - public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) throws IOException { + public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); this.checkpointId = checkpointId; @@ -427,8 +427,8 @@ public void releaseSnapshotResources(boolean canceled) { if (null != snapshotResultStateHandle) { snapshotResultStateHandle.discardState(); } - } catch (Exception ignored) { - LOG.warn("Exception occurred during snapshot state handle cleanup: " + ignored); + } catch (Exception e) { + LOG.warn("Exception occurred during snapshot state handle cleanup.", e); } } } @@ -452,7 +452,7 @@ public KeyGroupsStateHandle getSnapshotResultStateHandle() { return snapshotResultStateHandle; } - private void writeKVStateMetaData() throws IOException, InterruptedException { + private void writeKVStateMetaData() throws IOException { List> metaInfoList = new ArrayList<>(stateBackend.kvStateInformation.size()); diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 70f74b0f1b5ff..46a184a035057 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.FSDataInputStream; @@ -31,8 +32,14 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.SubtaskState; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; @@ -47,6 +54,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.FutureUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -69,10 +77,21 @@ import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; /** * Tests for asynchronous RocksDB Key/Value state checkpoints. @@ -182,7 +201,7 @@ public void acknowledgeCheckpoint( testHarness.waitForTaskCompletion(); if (mockEnv.wasFailedExternally()) { - Assert.fail("Unexpected exception during execution."); + fail("Unexpected exception during execution."); } } @@ -259,7 +278,7 @@ public String getKey(String value) throws Exception { if (mockEnv.wasFailedExternally()) { throw new AsynchronousException(new InterruptedException("Exception was thrown as expected.")); } - Assert.fail("Operation completed. Cancel failed."); + fail("Operation completed. Cancel failed."); } catch (Exception expected) { AsynchronousException asynchronousException = null; @@ -268,7 +287,7 @@ public String getKey(String value) throws Exception { } else if (expected.getCause() instanceof AsynchronousException) { asynchronousException = (AsynchronousException) expected.getCause(); } else { - Assert.fail("Unexpected exception: " + expected); + fail("Unexpected exception: " + expected); } // we expect the exception from canceling snapshots @@ -279,6 +298,58 @@ public String getKey(String value) throws Exception { } } + /** + * Test that the snapshot files are cleaned up in case of a failure during the snapshot + * procedure. + */ + @Test + public void testCleanupOfSnapshotsInFailureCase() throws Exception { + long checkpointId = 1L; + long timestamp = 42L; + + Environment env = new DummyEnvironment("test task", 1, 0); + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStreamFactory checkpointStreamFactory = mock(CheckpointStreamFactory.class); + AbstractStateBackend stateBackend = mock(AbstractStateBackend.class); + + final IOException testException = new IOException("Test exception"); + + doReturn(checkpointStreamFactory).when(stateBackend).createStreamFactory(any(JobID.class), anyString()); + doThrow(testException).when(outputStream).write(anyInt()); + doReturn(outputStream).when(checkpointStreamFactory).createCheckpointStateOutputStream(eq(checkpointId), eq(timestamp)); + + RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend); + + backend.setDbStoragePath("file:///tmp/foobar"); + + AbstractKeyedStateBackend keyedStateBackend = backend.createKeyedStateBackend( + env, + new JobID(), + "test operator", + VoidSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + null); + + // register a state so that the state backend has to checkpoint something + keyedStateBackend.getPartitionedState( + "namespace", + StringSerializer.INSTANCE, + new ValueStateDescriptor<>("foobar", String.class)); + + RunnableFuture snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory); + + try { + FutureUtil.runIfNotDoneAndGet(snapshotFuture); + fail("Expected an exception to be thrown here."); + } catch (ExecutionException e) { + Assert.assertEquals(testException, e.getCause()); + } + + verify(outputStream).close(); + } + @Test public void testConsistentSnapshotSerializationFlagsAndMasks() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java index 4dbbeaf6342cd..d45aedffdcff1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java @@ -21,8 +21,9 @@ import org.apache.flink.annotation.PublicEvolving; /** - * This interface provides a context in which operators that use managed (i.e. state that is managed by state - * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot. + * This interface provides a context in which operators that use managed (i.e. state that is managed + * by state backends) or raw (i.e. the operator can write it's state streams) state can perform a + * snapshot. */ @PublicEvolving public interface StateSnapshotContext extends FunctionSnapshotContext { @@ -37,4 +38,4 @@ public interface StateSnapshotContext extends FunctionSnapshotContext { */ OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception; -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java index ce8a6c4c3b516..cbdde4b8a8dd1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -127,4 +128,38 @@ private RunnableFuture closeAndUnregisterStream return new DoneFuture<>(stream.closeAndGetHandle()); } -} \ No newline at end of file + private void closeAndUnregisterStream(NonClosingCheckpointOutputStream stream) throws IOException { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + } + + public void close() throws IOException { + IOException exception = null; + + if (keyedStateCheckpointOutputStream != null) { + try { + closeAndUnregisterStream(keyedStateCheckpointOutputStream); + } catch (IOException e) { + exception = ExceptionUtils.firstOrSuppressed( + new IOException("Could not close the raw keyed state checkpoint output stream.", e), + exception); + } + } + + if (operatorStateCheckpointOutputStream != null) { + try { + closeAndUnregisterStream(operatorStateCheckpointOutputStream); + } catch (IOException e) { + exception = ExceptionUtils.firstOrSuppressed( + new IOException("Could not close the raw operator state checkpoint output stream.", e), + exception); + } + } + + if (exception != null) { + throw exception; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index 1be3abf49720b..30b1da6a2ed46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -266,17 +266,21 @@ public void close() { if (outStream != null) { try { outStream.close(); - fs.delete(statePath, false); - + } catch (Throwable throwable) { + LOG.warn("Could not close the state stream for {}.", statePath, throwable); + } finally { try { - FileUtils.deletePathIfEmpty(fs, basePath); - } catch (Exception ignored) { - LOG.debug("Could not delete the parent directory {}.", basePath, ignored); + fs.delete(statePath, false); + + try { + FileUtils.deletePathIfEmpty(fs, basePath); + } catch (Exception ignored) { + LOG.debug("Could not delete the parent directory {}.", basePath, ignored); + } + } catch (Exception e) { + LOG.warn("Cannot delete closed and discarded state stream for {}.", statePath, e); } } - catch (Exception e) { - LOG.warn("Cannot delete closed and discarded state stream for " + statePath, e); - } } } } @@ -297,20 +301,41 @@ public StreamStateHandle closeAndGetHandle() throws IOException { return new ByteStreamStateHandle(createStatePath().toString(), bytes); } else { - flush(); - - closed = true; - pos = writeBuffer.length; - - long size = -1; - // make a best effort attempt to figure out the size try { - size = outStream.getPos(); - } catch (Exception ignored) {} - - outStream.close(); - - return new FileStateHandle(statePath, size); + flush(); + + pos = writeBuffer.length; + + long size = -1L; + + // make a best effort attempt to figure out the size + try { + size = outStream.getPos(); + } catch (Exception ignored) {} + + outStream.close(); + + return new FileStateHandle(statePath, size); + } catch (Exception exception) { + try { + fs.delete(statePath, false); + + try { + FileUtils.deletePathIfEmpty(fs, basePath); + } catch (Exception parentDirDeletionFailure) { + LOG.debug("Could not delete the parent directory {}.", basePath, parentDirDeletionFailure); + } + } catch (Exception deleteException) { + LOG.warn("Could not delete the checkpoint stream file {}.", + statePath, deleteException); + } + + throw new IOException("Could not flush and close the file system " + + "output stream to " + statePath + " in order to obtain the " + + "stream state handle", exception); + } finally { + closed = true; + } } } else { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java index 6d371b1531fb2..8617193eaace3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java @@ -18,14 +18,17 @@ package org.apache.flink.runtime.state.filesystem; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.io.DataInputStream; import java.io.File; @@ -37,6 +40,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class FsCheckpointStateOutputStreamTest { @@ -108,6 +118,69 @@ public void testGetPos() throws Exception { stream.closeAndGetHandle(); } + /** + * Tests that the underlying stream file is deleted upon calling close. + */ + @Test + public void testCleanupWhenClosingStream() throws IOException { + + final FileSystem fs = mock(FileSystem.class); + final FSDataOutputStream outputStream = mock(FSDataOutputStream.class); + + final ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + + when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream); + + CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, + fs, + 4, + 0); + + // this should create the underlying file stream + stream.write(new byte[]{1,2,3,4,5}); + + verify(fs).create(any(Path.class), anyBoolean()); + + stream.close(); + + verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean()); + } + + /** + * Tests that the underlying stream file is deleted if the closeAndGetHandle method fails. + */ + @Test + public void testCleanupWhenFailingCloseAndGetHandle() throws IOException { + final FileSystem fs = mock(FileSystem.class); + final FSDataOutputStream outputStream = mock(FSDataOutputStream.class); + + final ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class); + + when(fs.create(pathCaptor.capture(), anyBoolean())).thenReturn(outputStream); + doThrow(new IOException("Test IOException.")).when(outputStream).close(); + + CheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, + fs, + 4, + 0); + + // this should create the underlying file stream + stream.write(new byte[]{1,2,3,4,5}); + + verify(fs).create(any(Path.class), anyBoolean()); + + try { + stream.closeAndGetHandle(); + fail("Expected IOException"); + } catch (IOException ioE) { + // expected exception + } + + verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean()); + } + private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception { FsCheckpointStreamFactory.CheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 6419aa681e04c..ab1ad1d0aa3c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -394,10 +394,19 @@ public void snapshotState(StateSnapshotContext context) throws Exception { int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); checkpointedState.clear(); + List readerState = reader.getReaderState(); - for (TimestampedFileInputSplit split : readerState) { - // create a new partition for each entry. - checkpointedState.add(split); + + try { + for (TimestampedFileInputSplit split : readerState) { + // create a new partition for each entry. + checkpointedState.add(split); + } + } catch (Exception e) { + checkpointedState.clear(); + + throw new Exception("Could not add timestamped file input splits to to operator " + + "state backend of operator " + getOperatorName() + '.', e); } if (LOG.isDebugEnabled()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java index d1d264fb7c291..679ef0b27e2a7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/util/StreamingFunctionUtils.java @@ -131,8 +131,15 @@ private static boolean trySnapshotFunctionState( listState.clear(); if (null != partitionableState) { - for (Serializable statePartition : partitionableState) { - listState.add(statePartition); + try { + for (Serializable statePartition : partitionableState) { + listState.add(statePartition); + } + } catch (Exception e) { + listState.clear(); + + throw new Exception("Could not write partitionable state to operator " + + "state backend.", e); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a21660c2cee11..02efc88fbafe6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -66,6 +66,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collection; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -342,21 +343,50 @@ public final OperatorSnapshotResult snapshotState( StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables()); - snapshotState(snapshotContext); + try { + snapshotState(snapshotContext); + } catch (Exception e) { + try { + snapshotContext.close(); + } catch (IOException closingException) { + e.addSuppressed(closingException); + } + + throw new Exception("Could not snapshot the operator " + getOperatorName() + + " for checkpoint " + checkpointId + '.', e); + } OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult(); - snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); - snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); + try { + snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); + snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); - if (null != operatorStateBackend) { - snapshotInProgress.setOperatorStateManagedFuture( + if (null != operatorStateBackend) { + snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory)); - } + } - if (null != keyedStateBackend) { - snapshotInProgress.setKeyedStateManagedFuture( + if (null != keyedStateBackend) { + snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory)); + } + } catch (Exception snapshotException) { + try { + snapshotInProgress.cancel(); + } catch (Exception e) { + snapshotException.addSuppressed(e); + } + + throw new Exception("Could not complete snapshot " + checkpointId + " for operator " + + getOperatorName() + '.', snapshotException); + } finally { + try { + snapshotContext.close(); + } catch (IOException e) { + LOG.warn("Could not close the snapshot context for checkpoint {} and operator {}.", + checkpointId, getOperatorName(), e); + } } return snapshotInProgress; @@ -369,21 +399,40 @@ public final OperatorSnapshotResult snapshotState( */ public void snapshotState(StateSnapshotContext context) throws Exception { if (getKeyedStateBackend() != null) { - KeyedStateCheckpointOutputStream out = context.getRawKeyedOperatorStateOutput(); + KeyedStateCheckpointOutputStream out; + + try { + out = context.getRawKeyedOperatorStateOutput(); + } catch (Exception exception) { + throw new Exception("Could not open raw keyed operator state stream for " + + getOperatorName() + '.', exception); + } - KeyGroupsList allKeyGroups = out.getKeyGroupList(); - for (int keyGroupIdx : allKeyGroups) { - out.startNewKeyGroup(keyGroupIdx); + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + for (int keyGroupIdx : allKeyGroups) { + out.startNewKeyGroup(keyGroupIdx); - DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); - dov.writeInt(timerServices.size()); + DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); + dov.writeInt(timerServices.size()); - for (Map.Entry> entry : timerServices.entrySet()) { - String serviceName = entry.getKey(); - HeapInternalTimerService timerService = entry.getValue(); + for (Map.Entry> entry : timerServices.entrySet()) { + String serviceName = entry.getKey(); + HeapInternalTimerService timerService = entry.getValue(); - dov.writeUTF(serviceName); - timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx); + dov.writeUTF(serviceName); + timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx); + } + } + } catch (Exception exception) { + throw new Exception("Could not write timer service of " + getOperatorName() + + " to checkpoint state stream.", exception); + } finally { + try { + out.close(); + } catch (Exception closeException) { + LOG.warn("Could not close raw keyed operator state stream for {}. This " + + "might have prevented deleting some state data.", getOperatorName(), closeException); } } } @@ -457,6 +506,21 @@ public StreamConfig getOperatorConfig() { public ClassLoader getUserCodeClassloader() { return container.getUserCodeClassLoader(); } + + /** + * Return the operator name. If the runtime context has been set, then the task name with + * subtask index is returned. Otherwise, the simple class name is returned. + * + * @return If runtime context is set, then return task name with subtask index. Otherwise return + * simple class name. + */ + protected String getOperatorName() { + if (runtimeContext != null) { + return runtimeContext.getTaskNameWithSubtasks(); + } else { + return getClass().getSimpleName(); + } + } /** * Returns a context that allows the operator to query information about the execution and also diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index 8b9f7584d5823..913928f1c663d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -20,8 +20,10 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FutureUtil; -import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; /** @@ -81,16 +83,54 @@ public void setOperatorStateRawFuture(RunnableFuture operat this.operatorStateRawFuture = operatorStateRawFuture; } - public void cancel() { - cancelIfNotNull(getKeyedStateManagedFuture()); - cancelIfNotNull(getOperatorStateManagedFuture()); - cancelIfNotNull(getKeyedStateRawFuture()); - cancelIfNotNull(getOperatorStateRawFuture()); + public void cancel() throws Exception { + Exception exception = null; + + try { + cancelIfNotNull(getKeyedStateManagedFuture()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not properly cancel managed keyed state future.", e), + exception); + } + + try { + cancelIfNotNull(getOperatorStateManagedFuture()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not properly cancel managed operator state future.", e), + exception); + } + + try { + cancelIfNotNull(getKeyedStateRawFuture()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not properly cancel raw keyed state future.", e), + exception); + } + + try { + cancelIfNotNull(getOperatorStateRawFuture()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not properly cancel raw operator state future.", e), + exception); + } + + if (exception != null) { + throw exception; + } } - private static void cancelIfNotNull(Future future) { + private static void cancelIfNotNull(RunnableFuture future) throws Exception { if (null != future) { - future.cancel(true); + if (!future.cancel(true)) { + // the cancellation was not successful because it might have been completed before + StreamStateHandle streamStateHandle = FutureUtil.runIfNotDoneAndGet(future); + + streamStateHandle.discardState(); + } } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index 754b754dbc451..f43f8b990534c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -231,18 +231,25 @@ public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); ListState partitionableState = - getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); partitionableState.clear(); Collection> values = queue.values(); - for (StreamElementQueueEntry value : values) { - partitionableState.add(value.getStreamElement()); - } + try { + for (StreamElementQueueEntry value : values) { + partitionableState.add(value.getStreamElement()); + } + + // add the pending stream element queue entry if the stream element queue is currently full + if (pendingStreamElementQueueEntry != null) { + partitionableState.add(pendingStreamElementQueueEntry.getStreamElement()); + } + } catch (Exception e) { + partitionableState.clear(); - // add the pending stream element queue entry if the stream element queue is currently full - if (pendingStreamElementQueueEntry != null) { - partitionableState.add(pendingStreamElementQueueEntry.getStreamElement()); + throw new Exception("Could not add stream element queue entries to operator state " + + "backend of operator " + getOperatorName() + '.', e); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 564fa22999c73..7a571ec52deae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -162,9 +162,17 @@ public void snapshotState(StateSnapshotContext context) throws Exception { saveHandleInState(context.getCheckpointId(), context.getCheckpointTimestamp()); this.checkpointedState.clear(); - for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) { - // create a new partition for each entry. - this.checkpointedState.add(pendingCheckpoint); + + try { + for (PendingCheckpoint pendingCheckpoint : pendingCheckpoints) { + // create a new partition for each entry. + this.checkpointedState.add(pendingCheckpoint); + } + } catch (Exception e) { + checkpointedState.clear(); + + throw new Exception("Could not add panding checkpoints to operator state " + + "backend of operator " + getOperatorName() + '.', e); } int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 530401b4d49c9..775475dc47669 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -57,6 +57,7 @@ import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -590,8 +591,20 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws // we cannot broadcast the cancellation markers on the 'operator chain', because it may not // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); + Exception exception = null; + for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { - output.writeEventToAllChannels(message); + try { + output.writeEventToAllChannels(message); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed( + new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), + exception); + } + } + + if (exception != null) { + throw exception; } return false; @@ -957,7 +970,12 @@ public void close() { // cleanup/release ongoing snapshot operations for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) { if (null != snapshotResult) { - snapshotResult.cancel(); + try { + snapshotResult.cancel(); + } catch (Exception e) { + LOG.warn("Could not properly cancel operator snapshot result in async " + + "checkpoint runnable.", e); + } } } } @@ -1021,24 +1039,30 @@ public void executeCheckpointing() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("{} - finished synchronous part of checkpoint {}." + - "Alignment duration: {} ms, snapshot duration {} ms", - owner.getName(), checkpointMetaData.getCheckpointId(), - checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, - checkpointMetaData.getSyncDurationMillis()); + "Alignment duration: {} ms, snapshot duration {} ms", + owner.getName(), checkpointMetaData.getCheckpointId(), + checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, + checkpointMetaData.getSyncDurationMillis()); } } finally { if (failed) { // Cleanup to release resources for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) { if (null != operatorSnapshotResult) { - operatorSnapshotResult.cancel(); + try { + operatorSnapshotResult.cancel(); + } catch (Exception e) { + LOG.warn("Could not properly cancel an operator snapshot result.", e); + } } } if (LOG.isDebugEnabled()) { LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." + - "Alignment duration: {} ms, snapshot duration {} ms", - owner.getName(), checkpointMetaData.getCheckpointId()); + "Alignment duration: {} ms, snapshot duration {} ms", + owner.getName(), checkpointMetaData.getCheckpointId(), + checkpointMetaData.getAlignmentDurationNanos() / 1_000_000, + checkpointMetaData.getSyncDurationMillis()); } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index f4051c927d351..409a7326e905b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -19,35 +19,61 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.internal.util.reflection.Whitebox; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.RunnableFuture; import static junit.framework.TestCase.assertTrue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.doReturn; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; /** * Tests for the facilities provided by {@link AbstractStreamOperator}. This mostly * tests timers and state and whether they are correctly checkpointed/restored * with key-group reshuffling. */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(AbstractStreamOperator.class) public class AbstractStreamOperatorTest { @Test @@ -452,6 +478,133 @@ public void testStateAndTimerStateShufflingScalingDown() throws Exception { assertTrue(extractResult(testHarness3).isEmpty()); } + /** + * Checks that the state snapshot context is closed after a successful snapshot operation. + */ + @Test + public void testSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); + + whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); + + CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); + StreamTask> containingTask = mock(StreamTask.class); + when(containingTask.getCancelables()).thenReturn(closeableRegistry); + + AbstractStreamOperator operator = mock(AbstractStreamOperator.class); + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod(); + doReturn(containingTask).when(operator).getContainingTask(); + + operator.snapshotState(checkpointId, timestamp, streamFactory); + + verify(context).close(); + } + + /** + * Tests that the created StateSnapshotContextSynchronousImpl is closed in case of a failing + * Operator#snapshotState(StaetSnapshotContextSynchronousImpl) call. + */ + @Test + public void testFailingSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final Exception failingException = new Exception("Test exception"); + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); + + whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); + + CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); + StreamTask> containingTask = mock(StreamTask.class); + when(containingTask.getCancelables()).thenReturn(closeableRegistry); + + AbstractStreamOperator operator = mock(AbstractStreamOperator.class); + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod(); + doReturn(containingTask).when(operator).getContainingTask(); + + // lets fail when calling the actual snapshotState method + doThrow(failingException).when(operator).snapshotState(eq(context)); + + try { + operator.snapshotState(checkpointId, timestamp, streamFactory); + fail("Exception expected."); + } catch (Exception e) { + assertEquals(failingException, e.getCause()); + } + + verify(context).close(); + } + + /** + * Tests that a failing snapshot method call to the keyed state backend will trigger the closing + * of the StateSnapshotContextSynchronousImpl and the cancellation of the + * OperatorSnapshotResult. The latter is supposed to also cancel all assigned futures. + */ + @Test + public void testFailingBackendSnapshotMethod() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + final Exception failingException = new Exception("Test exception"); + + final CloseableRegistry closeableRegistry = new CloseableRegistry(); + + RunnableFuture futureKeyGroupStateHandle = mock(RunnableFuture.class); + RunnableFuture futureOperatorStateHandle = mock(RunnableFuture.class); + + StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); + when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyGroupStateHandle); + when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle); + + OperatorSnapshotResult operatorSnapshotResult = spy(new OperatorSnapshotResult()); + + whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); + whenNew(OperatorSnapshotResult.class).withAnyArguments().thenReturn(operatorSnapshotResult); + + CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); + StreamTask> containingTask = mock(StreamTask.class); + when(containingTask.getCancelables()).thenReturn(closeableRegistry); + + AbstractStreamOperator operator = mock(AbstractStreamOperator.class); + when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenCallRealMethod(); + doReturn(containingTask).when(operator).getContainingTask(); + + RunnableFuture futureManagedOperatorStateHandle = mock(RunnableFuture.class); + + OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class); + when(operatorStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenReturn(futureManagedOperatorStateHandle); + + AbstractKeyedStateBackend keyedStateBackend = mock(AbstractKeyedStateBackend.class); + when(keyedStateBackend.snapshot(eq(checkpointId), eq(timestamp), eq(streamFactory))).thenThrow(failingException); + + Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend); + Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend); + + try { + operator.snapshotState(checkpointId, timestamp, streamFactory); + fail("Exception expected."); + } catch (Exception e) { + assertEquals(failingException, e.getCause()); + } + + // verify that the context has been closed, the operator snapshot result has been cancelled + // and that all futures have been cancelled. + verify(context).close(); + verify(operatorSnapshotResult).cancel(); + + verify(futureKeyGroupStateHandle).cancel(anyBoolean()); + verify(futureOperatorStateHandle).cancel(anyBoolean()); + verify(futureKeyGroupStateHandle).cancel(anyBoolean()); + } + /** * Extracts the result values form the test harness and clear the output queue. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java index 7e0ce5b02bd7c..490df52d868c1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResultTest.java @@ -20,80 +20,59 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.junit.Assert; +import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -public class OperatorSnapshotResultTest { +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; +public class OperatorSnapshotResultTest extends TestLogger { + + /** + * Tests that all runnable futures in an OperatorSnapshotResult are properly cancelled and if + * the StreamStateHandle result is retrievable that the state handle are discarded. + */ @Test - public void testCancel() { + public void testCancelAndCleanup() throws Exception { OperatorSnapshotResult operatorSnapshotResult = new OperatorSnapshotResult(); operatorSnapshotResult.cancel(); - RunnableFuture keyedStateManagedFuture = new TestRunnableFuture<>(); - RunnableFuture keyedStateRawFuture = new TestRunnableFuture<>(); - RunnableFuture operatorStateManagedFuture = new TestRunnableFuture<>(); - RunnableFuture operatorStateRawFuture = new TestRunnableFuture<>(); - - operatorSnapshotResult = new OperatorSnapshotResult( - keyedStateManagedFuture, - keyedStateRawFuture, - operatorStateManagedFuture, - operatorStateRawFuture); - - operatorSnapshotResult.cancel(); - - Assert.assertTrue(keyedStateManagedFuture.isCancelled()); - Assert.assertTrue(keyedStateRawFuture.isCancelled()); - Assert.assertTrue(operatorStateManagedFuture.isCancelled()); - Assert.assertTrue(operatorStateRawFuture.isCancelled()); - - } + KeyGroupsStateHandle keyedManagedStateHandle = mock(KeyGroupsStateHandle.class); + RunnableFuture keyedStateManagedFuture = mock(RunnableFuture.class); + when(keyedStateManagedFuture.get()).thenReturn(keyedManagedStateHandle); - static final class TestRunnableFuture implements RunnableFuture { + KeyGroupsStateHandle keyedRawStateHandle = mock(KeyGroupsStateHandle.class); + RunnableFuture keyedStateRawFuture = mock(RunnableFuture.class); + when(keyedStateRawFuture.get()).thenReturn(keyedRawStateHandle); - private boolean canceled; + OperatorStateHandle operatorManagedStateHandle = mock(OperatorStateHandle.class); + RunnableFuture operatorStateManagedFuture = mock(RunnableFuture.class); + when(operatorStateManagedFuture.get()).thenReturn(operatorManagedStateHandle); - public TestRunnableFuture() { - this.canceled = false; - } + OperatorStateHandle operatorRawStateHandle = mock(OperatorStateHandle.class); + RunnableFuture operatorStateRawFuture = mock(RunnableFuture.class); + when(operatorStateRawFuture.get()).thenReturn(operatorRawStateHandle); - @Override - public void run() { - - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return canceled = true; - } - - @Override - public boolean isCancelled() { - return canceled; - } + operatorSnapshotResult = new OperatorSnapshotResult( + keyedStateManagedFuture, + keyedStateRawFuture, + operatorStateManagedFuture, + operatorStateRawFuture); - @Override - public boolean isDone() { - return false; - } + operatorSnapshotResult.cancel(); - @Override - public T get() throws InterruptedException, ExecutionException { - return null; - } + verify(keyedStateManagedFuture).cancel(true); + verify(keyedStateRawFuture).cancel(true); + verify(operatorStateManagedFuture).cancel(true); + verify(operatorStateRawFuture).cancel(true); - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return null; - } + verify(keyedManagedStateHandle).discardState(); + verify(keyedRawStateHandle).discardState(); + verify(operatorManagedStateHandle).discardState(); + verify(operatorRawStateHandle).discardState(); } - - -} \ No newline at end of file +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java index 2b2df4c06b04a..277ced5b908bc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java @@ -25,11 +25,22 @@ import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class StateSnapshotContextSynchronousImplTest { +import java.io.Closeable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; + +public class StateSnapshotContextSynchronousImplTest extends TestLogger { private StateSnapshotContextSynchronousImpl snapshotContext; @@ -43,8 +54,8 @@ public void setUp() throws Exception { @Test public void testMetaData() { - Assert.assertEquals(42, snapshotContext.getCheckpointId()); - Assert.assertEquals(4711, snapshotContext.getCheckpointTimestamp()); + assertEquals(42, snapshotContext.getCheckpointId()); + assertEquals(4711, snapshotContext.getCheckpointTimestamp()); } @Test @@ -58,4 +69,58 @@ public void testCreateRawOperatorStateOutput() throws Exception { OperatorStateCheckpointOutputStream stream = snapshotContext.getRawOperatorStateOutput(); Assert.assertNotNull(stream); } -} \ No newline at end of file + + /** + * Tests that closing the StateSnapshotContextSynchronousImpl will also close the associated + * output streams. + */ + @Test + public void testStreamClosingWhenClosing() throws Exception { + long checkpointId = 42L; + long checkpointTimestamp = 1L; + + CheckpointStreamFactory.CheckpointStateOutputStream outputStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStreamFactory.CheckpointStateOutputStream outputStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + + CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); + when(streamFactory.createCheckpointStateOutputStream(eq(checkpointId), eq(checkpointTimestamp))).thenReturn(outputStream1, outputStream2); + + InsightCloseableRegistry closableRegistry = new InsightCloseableRegistry(); + + KeyGroupRange keyGroupRange = new KeyGroupRange(0, 2); + + StateSnapshotContextSynchronousImpl context = new StateSnapshotContextSynchronousImpl( + checkpointId, + checkpointTimestamp, + streamFactory, + keyGroupRange, + closableRegistry); + + // creating the output streams + context.getRawKeyedOperatorStateOutput(); + context.getRawOperatorStateOutput(); + + verify(streamFactory, times(2)).createCheckpointStateOutputStream(eq(checkpointId), eq(checkpointTimestamp)); + + assertEquals(2, closableRegistry.size()); + assertTrue(closableRegistry.contains(outputStream1)); + assertTrue(closableRegistry.contains(outputStream2)); + + context.close(); + + verify(outputStream1).close(); + verify(outputStream2).close(); + + assertEquals(0, closableRegistry.size()); + } + + static final class InsightCloseableRegistry extends CloseableRegistry { + public int size() { + return closeableToRef.size(); + } + + public boolean contains(Closeable closeable) { + return closeableToRef.containsKey(closeable); + } + } +} From 5eb4c2ff00a3818c53bac6c440d83bff0be8501a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 20 Jan 2017 14:28:44 +0100 Subject: [PATCH 2/2] [FLINK-5229] [state] Cleanup of operator snapshots if subsequent operator snapshots fail This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple stream operators, then every operator is checkpointed. In case that a snapshot operation fails all state handles and OperatorSnapshotResults belonging to previous operators have to be freed. Add test cases for failing checkpoint operations in StreamTask --- .../streaming/runtime/tasks/StreamTask.java | 47 ++++- .../runtime/tasks/StreamTaskTest.java | 182 +++++++++++++++++- 2 files changed, 223 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 775475dc47669..f0af5bd81c05e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -530,7 +530,7 @@ public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws E // propagate exceptions only if the task is still in "running" state if (isRunning) { throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + - "for operator " + getName() + '.', e); + " for operator " + getName() + '.', e); } else { LOG.debug("Could not perform checkpoint {} for operator {} while the " + "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e); @@ -953,6 +953,27 @@ public void run() { owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); } } catch (Exception e) { + // clean up ongoing operator snapshot results and non partitioned state handles + for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) { + if (operatorSnapshotResult != null) { + try { + operatorSnapshotResult.cancel(); + } catch (Exception cancelException) { + e.addSuppressed(cancelException); + } + } + } + + for (StreamStateHandle nonPartitionedStateHandle : nonPartitionedStateHandles) { + if (nonPartitionedStateHandle != null) { + try { + nonPartitionedStateHandle.discardState(); + } catch (Exception discardException) { + e.addSuppressed(discardException); + } + } + } + // registers the exception and tries to fail the whole task AsynchronousException asyncException = new AsynchronousException( new Exception( @@ -978,6 +999,18 @@ public void close() { } } } + + // discard non partitioned state handles + for (StreamStateHandle nonPartitionedStateHandle : nonPartitionedStateHandles) { + if (nonPartitionedStateHandle != null) { + try { + nonPartitionedStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard non partitioned state handle " + + "in async checkpoint runnable.", e); + } + } + } } } @@ -1057,6 +1090,18 @@ public void executeCheckpointing() throws Exception { } } + // Cleanup non partitioned state handles + for (StreamStateHandle nonPartitionedState : nonPartitionedStates) { + if (nonPartitionedState != null) { + try { + nonPartitionedState.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard a non partitioned " + + "state. This might leave some orphaned files behind.", e); + } + } + } + if (LOG.isDebugEnabled()) { LOG.debug("{} - did NOT finish synchronous part of checkpoint {}." + "Alignment duration: {} ms, snapshot duration {} ms", diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index b55c28803c0ee..ffdb09d552fd6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -20,15 +20,19 @@ import akka.dispatch.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.execution.Environment; @@ -51,9 +55,12 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateBackendFactory; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; @@ -65,16 +72,20 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.internal.util.reflection.Whitebox; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import scala.concurrent.Await; @@ -90,7 +101,9 @@ import java.util.Collections; import java.util.Comparator; import java.util.PriorityQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -98,10 +111,14 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; -public class StreamTaskTest { +public class StreamTaskTest extends TestLogger { private static OneShotLatch SYNC_LATCH; @@ -170,8 +187,8 @@ public void testStateBackendLoadingAndClosing() throws Exception { task.getExecutingThread().join(); // ensure that the state backends are closed - Mockito.verify(StateBackendTestSource.operatorStateBackend).close(); - Mockito.verify(StateBackendTestSource.keyedStateBackend).close(); + verify(StateBackendTestSource.operatorStateBackend).close(); + verify(StateBackendTestSource.keyedStateBackend).close(); assertEquals(ExecutionState.FINISHED, task.getExecutionState()); } @@ -194,8 +211,8 @@ public void testStateBackendClosingOnFailure() throws Exception { task.getExecutingThread().join(); // ensure that the state backends are closed - Mockito.verify(StateBackendTestSource.operatorStateBackend).close(); - Mockito.verify(StateBackendTestSource.keyedStateBackend).close(); + verify(StateBackendTestSource.operatorStateBackend).close(); + verify(StateBackendTestSource.keyedStateBackend).close(); assertEquals(ExecutionState.FAILED, task.getExecutionState()); } @@ -240,6 +257,161 @@ public void testCancellationFailsWithBlockingLock() throws Exception { assertEquals(ExecutionState.CANCELED, task.getExecutionState()); } + @Test + public void testFailingCheckpointStreamOperator() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); + when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); + Environment mockEnvironment = mock(Environment.class); + when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); + + StreamTask> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + streamTask.setEnvironment(mockEnvironment); + + StreamOperator streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + StreamOperator streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + StreamOperator streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + + OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class); + OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class); + + final Exception testException = new Exception("Test exception"); + + when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1); + when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2); + when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenThrow(testException); + + StreamOperator[] streamOperators = {streamOperator1, streamOperator2, streamOperator3}; + + OperatorChain> operatorChain = mock(OperatorChain.class); + when(operatorChain.getAllOperators()).thenReturn(streamOperators); + + StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class); + StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class); + StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class); + + CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + + when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1); + when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2); + when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3); + + CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class); + when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn( + outStream1, outStream2, outStream3); + + AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class); + when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory); + + Whitebox.setInternalState(streamTask, "isRunning", true); + Whitebox.setInternalState(streamTask, "lock", new Object()); + Whitebox.setInternalState(streamTask, "operatorChain", operatorChain); + Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry()); + Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); + Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend); + + try { + streamTask.triggerCheckpoint(checkpointMetaData); + fail("Expected test exception here."); + } catch (Exception e) { + assertEquals(testException, e.getCause()); + } + + verify(operatorSnapshotResult1).cancel(); + verify(operatorSnapshotResult2).cancel(); + + verify(streamStateHandle1).discardState(); + verify(streamStateHandle2).discardState(); + verify(streamStateHandle3).discardState(); + } + + /** + * Tests that in case of a failing AsyncCheckpointRunnable all operator snapshot results are + * cancelled and all non partitioned state handles are discarded. + */ + @Test + public void testFailingAsyncCheckpointRunnable() throws Exception { + final long checkpointId = 42L; + final long timestamp = 1L; + + TaskInfo mockTaskInfo = mock(TaskInfo.class); + when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); + when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); + Environment mockEnvironment = mock(Environment.class); + when(mockEnvironment.getTaskInfo()).thenReturn(mockTaskInfo); + + StreamTask> streamTask = mock(StreamTask.class, Mockito.CALLS_REAL_METHODS); + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); + streamTask.setEnvironment(mockEnvironment); + + StreamOperator streamOperator1 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + StreamOperator streamOperator2 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + StreamOperator streamOperator3 = mock(StreamOperator.class, withSettings().extraInterfaces(StreamCheckpointedOperator.class)); + + OperatorSnapshotResult operatorSnapshotResult1 = mock(OperatorSnapshotResult.class); + OperatorSnapshotResult operatorSnapshotResult2 = mock(OperatorSnapshotResult.class); + OperatorSnapshotResult operatorSnapshotResult3 = mock(OperatorSnapshotResult.class); + + RunnableFuture failingFuture = mock(RunnableFuture.class); + when(failingFuture.get()).thenThrow(new ExecutionException(new Exception("Test exception"))); + + when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture); + + when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1); + when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2); + when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3); + + StreamOperator[] streamOperators = {streamOperator1, streamOperator2, streamOperator3}; + + OperatorChain> operatorChain = mock(OperatorChain.class); + when(operatorChain.getAllOperators()).thenReturn(streamOperators); + + StreamStateHandle streamStateHandle1 = mock(StreamStateHandle.class); + StreamStateHandle streamStateHandle2 = mock(StreamStateHandle.class); + StreamStateHandle streamStateHandle3 = mock(StreamStateHandle.class); + + CheckpointStreamFactory.CheckpointStateOutputStream outStream1 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStreamFactory.CheckpointStateOutputStream outStream2 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + CheckpointStreamFactory.CheckpointStateOutputStream outStream3 = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class); + + when(outStream1.closeAndGetHandle()).thenReturn(streamStateHandle1); + when(outStream2.closeAndGetHandle()).thenReturn(streamStateHandle2); + when(outStream3.closeAndGetHandle()).thenReturn(streamStateHandle3); + + CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class); + when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn( + outStream1, outStream2, outStream3); + + AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class); + when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory); + + Whitebox.setInternalState(streamTask, "isRunning", true); + Whitebox.setInternalState(streamTask, "lock", new Object()); + Whitebox.setInternalState(streamTask, "operatorChain", operatorChain); + Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry()); + Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", MoreExecutors.newDirectExecutorService()); + Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration())); + Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend); + + streamTask.triggerCheckpoint(checkpointMetaData); + + verify(streamTask).handleAsyncException(anyString(), any(Throwable.class)); + + verify(operatorSnapshotResult1).cancel(); + verify(operatorSnapshotResult2).cancel(); + verify(operatorSnapshotResult3).cancel(); + + verify(streamStateHandle1).discardState(); + verify(streamStateHandle2).discardState(); + verify(streamStateHandle3).discardState(); + } + // ------------------------------------------------------------------------ // Test Utilities // ------------------------------------------------------------------------