From dea4daa1e8490c882ad15f9debe3a00c10836129 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 1 Mar 2017 14:08:35 +0100 Subject: [PATCH 1/4] [FLINK-5940] [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed checkpoint even though it might have read older checkpoint state handles from ZooKeeper. In order to deal with corrupted state handles, this commit changes the behaviour such that the completed checkpoint store adds all read retrievable state handles from ZooKeeper and upon request of the latest checkpoint it will return the latest completed checkpoint which could be retrieved from the state handles. Broken state handles are removed from the completed checkpoint store and ZooKeeper. --- .../ZooKeeperCompletedCheckpointStore.java | 96 +++++++----- ...ZooKeeperCompletedCheckpointStoreTest.java | 144 ++++++++++++++++++ 2 files changed, 201 insertions(+), 39 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index fdd0d409643f8..c8d2c0fced97a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -35,6 +35,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.ConcurrentModificationException; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -157,36 +158,8 @@ public void recover() throws Exception { LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); - if (numberOfInitialCheckpoints > 0) { - // Take the last one. This is the latest checkpoints, because path names are strictly - // increasing (checkpoint ID). - Tuple2, String> latest = initialCheckpoints - .get(numberOfInitialCheckpoints - 1); - - CompletedCheckpoint latestCheckpoint; - long checkpointId = pathToCheckpointId(latest.f1); - - LOG.info("Trying to retrieve checkpoint {}.", checkpointId); - - try { - latestCheckpoint = latest.f0.retrieveState(); - } catch (Exception e) { - throw new Exception("Could not retrieve the completed checkpoint " + checkpointId + - " from the state storage.", e); - } - - checkpointStateHandles.add(latest); - - LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint); - - for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) { - try { - removeSubsumed(initialCheckpoints.get(i)); - } - catch (Exception e) { - LOG.error("Failed to discard checkpoint", e); - } - } + for (Tuple2, String> checkpoint : initialCheckpoints) { + checkpointStateHandles.add(checkpoint); } } @@ -208,7 +181,7 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path)); // Everything worked, let's remove a previous checkpoint if necessary. - if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { + while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { removeSubsumed(checkpointStateHandles.removeFirst()); } @@ -221,7 +194,21 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception { return null; } else { - return checkpointStateHandles.getLast().f0.retrieveState(); + while(!checkpointStateHandles.isEmpty()) { + Tuple2, String> checkpointStateHandle = checkpointStateHandles.peekLast(); + + try { + return retrieveCompletedCheckpoint(checkpointStateHandle); + } catch (Exception e) { + LOG.warn("Could not retrieve latest checkpoint. Removing it from " + + "the completed checkpoint store.", e); + + // remove the checkpoint with broken state handle + removeBrokenStateHandle(checkpointStateHandles.pollLast()); + } + } + + return null; } } @@ -229,8 +216,21 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception { public List getAllCheckpoints() throws Exception { List checkpoints = new ArrayList<>(checkpointStateHandles.size()); - for (Tuple2, String> stateHandle : checkpointStateHandles) { - checkpoints.add(stateHandle.f0.retrieveState()); + Iterator, String>> stateHandleIterator = checkpointStateHandles.iterator(); + + while (stateHandleIterator.hasNext()) { + Tuple2, String> stateHandlePath = stateHandleIterator.next(); + + try { + checkpoints.add(retrieveCompletedCheckpoint(stateHandlePath)); + } catch (Exception e) { + LOG.warn("Could not retrieve checkpoint. Removing it from the completed " + + "checkpoint store.", e); + + // remove the checkpoint with broken state handle + stateHandleIterator.remove(); + removeBrokenStateHandle(stateHandlePath); + } } return checkpoints; @@ -298,6 +298,10 @@ public Void call() throws Exception { remove(stateHandleAndPath, action); } + private void removeBrokenStateHandle(final Tuple2, String> stateHandleAndPath) throws Exception { + remove(stateHandleAndPath, null); + } + /** * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle. */ @@ -315,11 +319,13 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex if (event.getResultCode() == 0) { Exception exception = null; - try { - action.call(); - } catch (Exception e) { - exception = new Exception("Could not execute callable action " + - "for checkpoint " + checkpointId + '.', e); + if (null != action) { + try { + action.call(); + } catch (Exception e) { + exception = new Exception("Could not execute callable action " + + "for checkpoint " + checkpointId + '.', e); + } } try { @@ -393,4 +399,16 @@ protected static long pathToCheckpointId(String path) { return -1L; } } + + private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2, String> stateHandlePath) throws Exception { + long checkpointId = pathToCheckpointId(stateHandlePath.f1); + + LOG.info("Trying to retrieve checkpoint {}.", checkpointId); + + try { + return stateHandlePath.f0.retrieveState(); + } catch (Exception e) { + throw new Exception("Could not retrieve checkpoint " + checkpointId + ". The state handle seems to be broken.", e); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 6ee014128a53b..5bf7c3cdae73a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -18,11 +18,47 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.BackgroundVersionable; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.DeleteBuilder; +import org.apache.curator.framework.api.Pathable; +import org.apache.curator.utils.EnsurePath; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; +@RunWith(PowerMockRunner.class) +@PrepareForTest(ZooKeeperCompletedCheckpointStore.class) public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { @Test @@ -33,4 +69,112 @@ public void testPathConversion() { assertEquals(checkpointId, ZooKeeperCompletedCheckpointStore.pathToCheckpointId(path)); } + + /** + * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper + * and ignores those which cannot be retrieved via their state handles. + */ + @Test + public void testCheckpointRecovery() throws Exception { + final List, String>> checkpointsInZooKeeper = new ArrayList<>(4); + + final CompletedCheckpoint completedCheckpoint1 = mock(CompletedCheckpoint.class); + when(completedCheckpoint1.getCheckpointID()).thenReturn(1L); + final CompletedCheckpoint completedCheckpoint2 = mock(CompletedCheckpoint.class); + when(completedCheckpoint2.getCheckpointID()).thenReturn(2L); + + final Collection expectedCheckpointIds = new HashSet<>(2); + expectedCheckpointIds.add(1L); + expectedCheckpointIds.add(2L); + + final RetrievableStateHandle failingRetrievableStateHandle = mock(RetrievableStateHandle.class); + when(failingRetrievableStateHandle.retrieveState()).thenThrow(new Exception("Test exception")); + + final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); + when(retrievableStateHandle1.retrieveState()).thenReturn(completedCheckpoint1); + + final RetrievableStateHandle retrievableStateHandle2 = mock(RetrievableStateHandle.class); + when(retrievableStateHandle2.retrieveState()).thenReturn(completedCheckpoint2); + + checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "foobar1")); + checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "failing1")); + checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "foobar2")); + checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "failing1")); + + final CuratorFramework client = mock(CuratorFramework.class); + final RetrievableStateStorageHelper storageHelperMock = mock(RetrievableStateStorageHelper.class); + + ZooKeeperStateHandleStore zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor())); + whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock); + doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByName(); + + final int numCheckpointsToRetain = 1; + final EnsurePath ensurePathMock = mock(EnsurePath.class); + final DeleteBuilder deleteBuilderMock = mock(DeleteBuilder.class); + final BackgroundVersionable backgroundVersionableMock = mock(BackgroundVersionable.class); + + when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock); + when(client.delete()).thenReturn(deleteBuilderMock); + when(deleteBuilderMock.deletingChildrenIfNeeded()).thenReturn(backgroundVersionableMock); + when(backgroundVersionableMock.inBackground(any(BackgroundCallback.class), any(Executor.class))).thenAnswer(new Answer>() { + @Override + public Pathable answer(InvocationOnMock invocation) throws Throwable { + final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0]; + + final CuratorEvent curatorEventMock = mock(CuratorEvent.class); + when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE); + when(curatorEventMock.getResultCode()).thenReturn(0); + + Pathable result = mock(Pathable.class); + + when(result.forPath(anyString())).thenAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + + callback.processResult(client, curatorEventMock); + + return null; + } + }); + + return result; + } + }); + + final String checkpointsPath = "foobar"; + final RetrievableStateStorageHelper stateSotrage = mock(RetrievableStateStorageHelper.class); + + ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( + numCheckpointsToRetain, + client, + checkpointsPath, + stateSotrage, + Executors.directExecutor()); + + zooKeeperCompletedCheckpointStore.recover(); + + CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(); + + // check that we return the latest retrievable checkpoint + // this should remove the latest checkpoint because it is broken + assertEquals(completedCheckpoint2.getCheckpointID(), latestCompletedCheckpoint.getCheckpointID()); + + // this should remove the second broken checkpoint because we're iterating over all checkpoints + List completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints(); + + Collection actualCheckpointIds = new HashSet<>(completedCheckpoints.size()); + + for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { + actualCheckpointIds.add(completedCheckpoint.getCheckpointID()); + } + + assertEquals(expectedCheckpointIds, actualCheckpointIds); + + // check that we did not discard any of the state handles which were retrieved + verify(retrievableStateHandle1, never()).discardState(); + verify(retrievableStateHandle2, never()).discardState(); + + // check that we have discarded the state handles which could not be retrieved + verify(failingRetrievableStateHandle, times(2)).discardState(); + } } From b554f2510aa1703502c5105947ff83c9a218adde Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 2 Mar 2017 12:06:44 +0100 Subject: [PATCH 2/4] Fix ZooKeeperCompletedCheckpointStoreITCase.testRecover --- ...oKeeperCompletedCheckpointStoreITCase.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 046adba10d32a..22c5d45a43e54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -27,11 +27,10 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; import java.io.Serializable; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -74,7 +73,9 @@ public RetrievableStateHandle store(CompletedCheckpoint sta // --------------------------------------------------------------------------------------------- /** - * Tests that older checkpoints are cleaned up at startup. + * Tests that older checkpoints are not cleaned up right away when recovering. Only after + * another checkpointed has been completed the old checkpoints exceeding the number of + * checkpoints to retain will be removed. */ @Test public void testRecover() throws Exception { @@ -96,19 +97,20 @@ public void testRecover() throws Exception { // Recover checkpoints.recover(); - // Only the latest one should be in ZK - Deadline deadline = new FiniteDuration(1, TimeUnit.MINUTES).fromNow(); + assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); + assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); + assertEquals(expected[2], checkpoints.getLatestCheckpoint()); - // Retry this operation, because removal is asynchronous - while (deadline.hasTimeLeft() && ZooKeeper.getClient() - .getChildren().forPath(CheckpointsPath).size() != 1) { + List expectedCheckpoints = new ArrayList<>(3); + expectedCheckpoints.add(expected[1]); + expectedCheckpoints.add(expected[2]); + expectedCheckpoints.add(createCheckpoint(3)); - Thread.sleep(Math.min(100, deadline.timeLeft().toMillis())); - } + checkpoints.addCheckpoint(expectedCheckpoints.get(2)); - assertEquals(1, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); - assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints()); - assertEquals(expected[2], checkpoints.getLatestCheckpoint()); + List actualCheckpoints = checkpoints.getAllCheckpoints(); + + assertEquals(expectedCheckpoints, actualCheckpoints); } /** From 3eeb28118e50a0b899a99f2ff6ac78386d0176e5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 13 Mar 2017 14:09:00 +0100 Subject: [PATCH 3/4] Move ZooKeeperStateHandleStore out of ZooKeeperCompletedCheckpointStore constructor --- .../ZooKeeperCompletedCheckpointStore.java | 10 +++++++--- ...oKeeperCompletedCheckpointStoreITCase.java | 2 +- ...ZooKeeperCompletedCheckpointStoreTest.java | 19 ++++++++----------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index c8d2c0fced97a..84d0da528f1db 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -189,7 +189,7 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { } @Override - public CompletedCheckpoint getLatestCheckpoint() throws Exception { + public CompletedCheckpoint getLatestCheckpoint() { if (checkpointStateHandles.isEmpty()) { return null; } @@ -203,8 +203,12 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception { LOG.warn("Could not retrieve latest checkpoint. Removing it from " + "the completed checkpoint store.", e); - // remove the checkpoint with broken state handle - removeBrokenStateHandle(checkpointStateHandles.pollLast()); + try { + // remove the checkpoint with broken state handle + removeBrokenStateHandle(checkpointStateHandles.pollLast()); + } catch (Exception removeException) { + LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 22c5d45a43e54..625999a3343c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -65,7 +65,7 @@ protected CompletedCheckpointStore createCompletedCheckpoints( ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper() { @Override public RetrievableStateHandle store(CompletedCheckpoint state) throws Exception { - return new HeapRetrievableStateHandle(state); + return new HeapRetrievableStateHandle<>(state); } }, Executors.directExecutor()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 5bf7c3cdae73a..37324a7f28875 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -33,11 +33,8 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.TestLogger; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.Collection; @@ -55,10 +52,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.whenNew; -@RunWith(PowerMockRunner.class) -@PrepareForTest(ZooKeeperCompletedCheckpointStore.class) public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { @Test @@ -96,19 +90,22 @@ public void testCheckpointRecovery() throws Exception { final RetrievableStateHandle retrievableStateHandle2 = mock(RetrievableStateHandle.class); when(retrievableStateHandle2.retrieveState()).thenReturn(completedCheckpoint2); - checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "foobar1")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "failing1")); - checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "foobar2")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "failing1")); + checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); + checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1")); + checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2")); + checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2")); final CuratorFramework client = mock(CuratorFramework.class); final RetrievableStateStorageHelper storageHelperMock = mock(RetrievableStateStorageHelper.class); ZooKeeperStateHandleStore zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor())); - whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock); doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByName(); final int numCheckpointsToRetain = 1; + + // Mocking for the delete operation on the CuratorFramework client + // It assures that the callback is executed synchronously + final EnsurePath ensurePathMock = mock(EnsurePath.class); final DeleteBuilder deleteBuilderMock = mock(DeleteBuilder.class); final BackgroundVersionable backgroundVersionableMock = mock(BackgroundVersionable.class); From 8ff058a3fb9dac6306d91a1a27bf5567f580d7bd Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 13 Mar 2017 17:52:49 +0100 Subject: [PATCH 4/4] Fix test case --- .../checkpoint/ZooKeeperCompletedCheckpointStoreTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 37324a7f28875..6124dd31db79a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -33,8 +33,11 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; import java.util.Collection; @@ -52,7 +55,10 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; +@RunWith(PowerMockRunner.class) +@PrepareForTest(ZooKeeperCompletedCheckpointStore.class) public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { @Test @@ -99,6 +105,7 @@ public void testCheckpointRecovery() throws Exception { final RetrievableStateStorageHelper storageHelperMock = mock(RetrievableStateStorageHelper.class); ZooKeeperStateHandleStore zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor())); + whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock); doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByName(); final int numCheckpointsToRetain = 1;