From ccb3ac8f2b64a357275d2967fcd440947becd272 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 17 May 2017 14:52:04 +0200 Subject: [PATCH] [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes In order to guard against deletions of ZooKeeper nodes which are still being used by a different ZooKeeperStateHandleStore, we have to introduce a locking mechanism. Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is allowed to be deleted. THe locking mechanism is implemented via ephemeral child nodes of the respective ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, thus, protecting it from being deleted, it creates an ephemeral child node. The node's name is unique to the ZooKeeperStateHandleStore instance. The delete operations will then only delete the node if it does not have any children associated. In order to guard against oprhaned lock nodes, they are created as ephemeral nodes. This means that they will be deleted by ZooKeeper once the connection of the ZooKeeper client which created the node timed out. --- .../store/ZooKeeperMesosWorkerStore.java | 8 +- .../ZooKeeperCompletedCheckpointStore.java | 150 +++--- .../ZooKeeperSubmittedJobGraphStore.java | 50 +- .../zookeeper/ZooKeeperStateHandleStore.java | 419 ++++++++++++---- .../CompletedCheckpointStoreTest.java | 9 + ...oKeeperCompletedCheckpointStoreITCase.java | 133 +++-- ...ZooKeeperCompletedCheckpointStoreTest.java | 11 +- ...ava => ZooKeeperStateHandleStoreTest.java} | 465 ++++++++++++------ 8 files changed, 854 insertions(+), 391 deletions(-) rename flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/{ZooKeeperStateHandleStoreITCase.java => ZooKeeperStateHandleStoreTest.java} (55%) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java index 42abd4cdd82d7..663ce566d6d75 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java @@ -88,7 +88,7 @@ public void stop(boolean cleanup) throws Exception { totalTaskCountInZooKeeper.close(); if(cleanup) { - workersInZooKeeper.removeAndDiscardAllState(); + workersInZooKeeper.releaseAndTryRemoveAll(); } isRunning = false; @@ -169,7 +169,7 @@ public List recoverWorkers() throws Exception { synchronized (startStopLock) { verifyIsRunning(); - List, String>> handles = workersInZooKeeper.getAll(); + List, String>> handles = workersInZooKeeper.getAllAndLock(); if(handles.size() != 0) { List workers = new ArrayList<>(handles.size()); @@ -199,7 +199,7 @@ public void putWorker(MesosWorkerStore.Worker worker) throws Exception { int currentVersion = workersInZooKeeper.exists(path); if (currentVersion == -1) { try { - workersInZooKeeper.add(path, worker); + workersInZooKeeper.addAndLock(path, worker); LOG.debug("Added {} in ZooKeeper.", worker); } catch (KeeperException.NodeExistsException ex) { throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex); @@ -227,7 +227,7 @@ public boolean removeWorker(Protos.TaskID taskID) throws Exception { return false; } - workersInZooKeeper.removeAndDiscardState(path); + workersInZooKeeper.releaseAndTryRemove(path); LOG.debug("Removed worker {} from ZooKeeper.", taskID); return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 95cfb0f977ff0..084d93e7ca24d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -19,9 +19,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -34,12 +31,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkArgument; @@ -155,7 +152,7 @@ public void recover() throws Exception { List, String>> initialCheckpoints; while (true) { try { - initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName(); + initialCheckpoints = checkpointsInZooKeeper.getAllSortedByNameAndLock(); break; } catch (ConcurrentModificationException e) { @@ -178,7 +175,7 @@ public void recover() throws Exception { "checkpoint store.", e); // remove the checkpoint with broken state handle - removeBrokenStateHandle(checkpointStateHandle); + removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0); } if (completedCheckpoint != null) { @@ -201,7 +198,7 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception final RetrievableStateHandle stateHandle; // First add the new one. If it fails, we don't want to loose existing data. - stateHandle = checkpointsInZooKeeper.add(path, checkpoint); + stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint); checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path)); @@ -211,7 +208,7 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception // Everything worked, let's remove a previous checkpoint if necessary. while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(checkpointStateHandles.removeFirst(), sharedStateRegistry); + removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry); } catch (Exception e) { LOG.warn("Failed to subsume the old checkpoint", e); } @@ -237,7 +234,8 @@ public CompletedCheckpoint getLatestCheckpoint() { try { // remove the checkpoint with broken state handle - removeBrokenStateHandle(checkpointStateHandles.pollLast()); + Tuple2, String> checkpoint = checkpointStateHandles.pollLast(); + removeBrokenStateHandle(checkpoint.f1, checkpoint.f0); } catch (Exception removeException) { LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException); } @@ -265,7 +263,7 @@ public List getAllCheckpoints() throws Exception { // remove the checkpoint with broken state handle stateHandleIterator.remove(); - removeBrokenStateHandle(stateHandlePath); + removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0); } } @@ -289,7 +287,7 @@ public void shutdown(JobStatus jobStatus) throws Exception { for (Tuple2, String> checkpoint : checkpointStateHandles) { try { - removeShutdown(checkpoint, jobStatus, sharedStateRegistry); + removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry); } catch (Exception e) { LOG.error("Failed to discard checkpoint.", e); } @@ -306,117 +304,87 @@ public void shutdown(JobStatus jobStatus) throws Exception { // Clear the local handles, but don't remove any state checkpointStateHandles.clear(); + + // Release the state handle locks in ZooKeeper such that they can be deleted + checkpointsInZooKeeper.releaseAll(); } } // ------------------------------------------------------------------------ private void removeSubsumed( - final Tuple2, String> stateHandleAndPath, + final String pathInZooKeeper, final SharedStateRegistry sharedStateRegistry) throws Exception { - Callable action = new Callable() { + ZooKeeperStateHandleStore.RemoveCallback action = new ZooKeeperStateHandleStore.RemoveCallback() { @Override - public Void call() throws Exception { - CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath); - - if (completedCheckpoint != null) { - completedCheckpoint.discardOnSubsume(sharedStateRegistry); - } + public void apply(@Nullable RetrievableStateHandle value) throws FlinkException { + if (value != null) { + final CompletedCheckpoint completedCheckpoint; + try { + completedCheckpoint = value.retrieveState(); + } catch (Exception e) { + throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e); + } - return null; + if (completedCheckpoint != null) { + try { + completedCheckpoint.discardOnSubsume(sharedStateRegistry); + } catch (Exception e) { + throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); + } + } + } } }; - remove(stateHandleAndPath, action); + checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action); } private void removeShutdown( - final Tuple2, String> stateHandleAndPath, + final String pathInZooKeeper, final JobStatus jobStatus, final SharedStateRegistry sharedStateRegistry) throws Exception { - Callable action = new Callable() { + ZooKeeperStateHandleStore.RemoveCallback removeAction = new ZooKeeperStateHandleStore.RemoveCallback() { @Override - public Void call() throws Exception { - CompletedCheckpoint completedCheckpoint = retrieveCompletedCheckpoint(stateHandleAndPath); - - if (completedCheckpoint != null) { - completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry); - } + public void apply(@Nullable RetrievableStateHandle value) throws FlinkException { + if (value != null) { + final CompletedCheckpoint completedCheckpoint; + + try { + completedCheckpoint = value.retrieveState(); + } catch (Exception e) { + throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e); + } - return null; + if (completedCheckpoint != null) { + try { + completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry); + } catch (Exception e) { + throw new FlinkException("Could not discard the completed checkpoint on subsume.", e); + } + } + } } }; - remove(stateHandleAndPath, action); - } - - private void removeBrokenStateHandle(final Tuple2, String> stateHandleAndPath) throws Exception { - remove(stateHandleAndPath, null); + checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction); } - /** - * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle. - */ - private void remove( - final Tuple2, String> stateHandleAndPath, - final Callable action) throws Exception { - - BackgroundCallback callback = new BackgroundCallback() { + private void removeBrokenStateHandle( + final String pathInZooKeeper, + final RetrievableStateHandle retrievableStateHandle) throws Exception { + checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback() { @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - final long checkpointId = pathToCheckpointId(stateHandleAndPath.f1); - + public void apply(@Nullable RetrievableStateHandle value) throws FlinkException { try { - if (event.getType() == CuratorEventType.DELETE) { - if (event.getResultCode() == 0) { - Exception exception = null; - - if (null != action) { - try { - action.call(); - } catch (Exception e) { - exception = new Exception("Could not execute callable action " + - "for checkpoint " + checkpointId + '.', e); - } - } - - try { - // Discard the state handle - stateHandleAndPath.f0.discardState(); - } catch (Exception e) { - Exception newException = new Exception("Could not discard meta " + - "data for completed checkpoint " + checkpointId + '.', e); - - if (exception == null) { - exception = newException; - } else { - exception.addSuppressed(newException); - } - } - - if (exception != null) { - throw exception; - } - } else { - throw new IllegalStateException("Unexpected result code " + - event.getResultCode() + " in '" + event + "' callback."); - } - } else { - throw new IllegalStateException("Unexpected event type " + - event.getType() + " in '" + event + "' callback."); - } + retrievableStateHandle.discardState(); } catch (Exception e) { - LOG.warn("Failed to discard checkpoint {}.", checkpointId, e); + throw new FlinkException("Could not discard state handle.", e); } } - }; - - // Remove state handle from ZooKeeper first. If this fails, we can still recover, but if - // we remove a state handle and fail to remove it from ZooKeeper, we end up in an - // inconsistent state. - checkpointsInZooKeeper.remove(stateHandleAndPath.f1, callback); + }); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java index 2552088f5cf78..fa972ed8b0fbe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java @@ -157,36 +157,46 @@ public void stop() throws Exception { @Override public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); - String path = getPathForJob(jobId); + final String path = getPathForJob(jobId); LOG.debug("Recovering job graph {} from {}{}.", jobId, zooKeeperFullBasePath, path); synchronized (cacheLock) { verifyIsRunning(); - RetrievableStateHandle jobGraphRetrievableStateHandle; + boolean success = false; try { - jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.get(path); - } catch (KeeperException.NoNodeException ignored) { - return null; - } catch (Exception e) { - throw new Exception("Could not retrieve the submitted job graph state handle " + - "for " + path + "from the submitted job graph store.", e); - } - SubmittedJobGraph jobGraph; + RetrievableStateHandle jobGraphRetrievableStateHandle; - try { - jobGraph = jobGraphRetrievableStateHandle.retrieveState(); - } catch (Exception e) { - throw new Exception("Failed to retrieve the submitted job graph from state handle.", e); - } + try { + jobGraphRetrievableStateHandle = jobGraphsInZooKeeper.getAndLock(path); + } catch (KeeperException.NoNodeException ignored) { + success = true; + return null; + } catch (Exception e) { + throw new Exception("Could not retrieve the submitted job graph state handle " + + "for " + path + "from the submitted job graph store.", e); + } + SubmittedJobGraph jobGraph; - addedJobGraphs.add(jobGraph.getJobId()); + try { + jobGraph = jobGraphRetrievableStateHandle.retrieveState(); + } catch (Exception e) { + throw new Exception("Failed to retrieve the submitted job graph from state handle.", e); + } - LOG.info("Recovered {}.", jobGraph); + addedJobGraphs.add(jobGraph.getJobId()); - return jobGraph; + LOG.info("Recovered {}.", jobGraph); + + success = true; + return jobGraph; + } finally { + if (!success) { + jobGraphsInZooKeeper.release(path); + } + } } } @@ -207,7 +217,7 @@ public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { if (currentVersion == -1) { try { - jobGraphsInZooKeeper.add(path, jobGraph); + jobGraphsInZooKeeper.addAndLock(path, jobGraph); addedJobGraphs.add(jobGraph.getJobId()); @@ -245,7 +255,7 @@ public void removeJobGraph(JobID jobId) throws Exception { synchronized (cacheLock) { if (addedJobGraphs.contains(jobId)) { - jobGraphsInZooKeeper.removeAndDiscardState(path); + jobGraphsInZooKeeper.releaseAndTryRemove(path); addedJobGraphs.remove(jobId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index 364ba0f57e3f8..a548f1d171f3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -20,28 +20,38 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * State handles backed by ZooKeeper. + * Class which stores state via the provided {@link RetrievableStateStorageHelper} and writes the + * returned state handle to ZooKeeper. The ZooKeeper node can be locked by creating an ephemeral + * child and only allowing the deletion of the ZooKeeper node if it does not have any children. + * That way we protect concurrent accesses from different ZooKeeperStateHandleStore instances. * *

Added state is persisted via {@link RetrievableStateHandle RetrievableStateHandles}, * which in turn are written to ZooKeeper. This level of indirection is necessary to keep the @@ -80,6 +90,9 @@ public class ZooKeeperStateHandleStore { private final Executor executor; + /** Lock node name of this ZooKeeperStateHandleStore. The name should be unique among all other state handle stores. */ + private final String lockNode; + /** * Creates a {@link ZooKeeperStateHandleStore}. * @@ -99,40 +112,36 @@ public ZooKeeperStateHandleStore( this.client = checkNotNull(client, "Curator client"); this.storage = checkNotNull(storage, "State storage"); this.executor = checkNotNull(executor); - } - /** - * Creates a state handle and stores it in ZooKeeper with create mode {@link - * CreateMode#PERSISTENT}. - * - * @see #add(String, T, CreateMode) - */ - public RetrievableStateHandle add(String pathInZooKeeper, T state) throws Exception { - return add(pathInZooKeeper, state, CreateMode.PERSISTENT); + // Generate a unique lock node name + lockNode = UUID.randomUUID().toString(); } /** - * Creates a state handle and stores it in ZooKeeper. + * Creates a state handle, stores it in ZooKeeper and locks it. A locked node cannot be removed by + * another {@link ZooKeeperStateHandleStore} instance as long as this instance remains connected + * to ZooKeeper. * *

Important: This will not store the actual state in * ZooKeeper, but create a state handle and store it in ZooKeeper. This level of indirection * makes sure that data in ZooKeeper is small. * - * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet and - * start with a '/') + *

The operation will fail if there is already an node under the given path + * + * @param pathInZooKeeper Destination path in ZooKeeper (expected to *not* exist yet) * @param state State to be added - * @param createMode The create mode for the new path in ZooKeeper * * @return The Created {@link RetrievableStateHandle}. * @throws Exception If a ZooKeeper or state handle operation fails */ - public RetrievableStateHandle add( + public RetrievableStateHandle addAndLock( String pathInZooKeeper, - T state, - CreateMode createMode) throws Exception { + T state) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); + final String path = normalizePath(pathInZooKeeper); + RetrievableStateHandle storeHandle = storage.store(state); boolean success = false; @@ -145,7 +154,11 @@ public RetrievableStateHandle add( // smaller than the state itself. This level of indirection makes sure that data in // ZooKeeper is small, because ZooKeeper is designed for data in the KB range, but // the state can be larger. - client.create().withMode(createMode).forPath(pathInZooKeeper, serializedStoreHandle); + // Create the lock node in a transaction with the actual state node. That way we can prevent + // race conditions with a concurrent delete operation. + client.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, serializedStoreHandle) + .and().create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path)) + .and().commit(); success = true; return storeHandle; @@ -172,7 +185,9 @@ public void replace(String pathInZooKeeper, int expectedVersion, T state) throws checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); checkNotNull(state, "State"); - RetrievableStateHandle oldStateHandle = get(pathInZooKeeper); + final String path = normalizePath(pathInZooKeeper); + + RetrievableStateHandle oldStateHandle = get(path, false); RetrievableStateHandle newStateHandle = storage.store(state); @@ -185,7 +200,7 @@ public void replace(String pathInZooKeeper, int expectedVersion, T state) throws // Replace state handle in ZooKeeper. client.setData() .withVersion(expectedVersion) - .forPath(pathInZooKeeper, serializedStateHandle); + .forPath(path, serializedStateHandle); success = true; } finally { if(success) { @@ -207,7 +222,9 @@ public void replace(String pathInZooKeeper, int expectedVersion, T state) throws public int exists(String pathInZooKeeper) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); - Stat stat = client.checkExists().forPath(pathInZooKeeper); + final String path = normalizePath(pathInZooKeeper); + + Stat stat = client.checkExists().forPath(path); if (stat != null) { return stat.getVersion(); @@ -217,32 +234,17 @@ public int exists(String pathInZooKeeper) throws Exception { } /** - * Gets a state handle from ZooKeeper. + * Gets the {@link RetrievableStateHandle} stored in the given ZooKeeper node and locks it. A + * locked node cannot be removed by another {@link ZooKeeperStateHandleStore} instance as long + * as this instance remains connected to ZooKeeper. * - * @param pathInZooKeeper Path in ZooKeeper to get the state handle from (expected to - * exist and start with a '/'). - * @return The state handle - * @throws Exception If a ZooKeeper or state handle operation fails + * @param pathInZooKeeper Path to the ZooKeeper node which contains the state handle + * @return The retrieved state handle from the specified ZooKeeper node + * @throws IOException Thrown if the method failed to deserialize the stored state handle + * @throws Exception Thrown if a ZooKeeper operation failed */ - @SuppressWarnings("unchecked") - public RetrievableStateHandle get(String pathInZooKeeper) throws Exception { - checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); - - byte[] data; - - try { - data = client.getData().forPath(pathInZooKeeper); - } catch (Exception e) { - throw new Exception("Failed to retrieve state handle data under " + pathInZooKeeper + - " from ZooKeeper.", e); - } - - try { - return InstantiationUtil.deserializeObject(data, Thread.currentThread().getContextClassLoader()); - } catch (IOException | ClassNotFoundException e) { - throw new IOException("Failed to deserialize state handle from ZooKeeper data from " + - pathInZooKeeper + '.', e); - } + public RetrievableStateHandle getAndLock(String pathInZooKeeper) throws Exception { + return get(pathInZooKeeper, true); } /** @@ -270,7 +272,7 @@ public Collection getAllPaths() throws Exception { } /** - * Gets all available state handles from ZooKeeper. + * Gets all available state handles from ZooKeeper and locks the respective state nodes. * *

If there is a concurrent modification, the operation is retried until it succeeds. * @@ -278,7 +280,7 @@ public Collection getAllPaths() throws Exception { * @throws Exception If a ZooKeeper or state handle operation fails */ @SuppressWarnings("unchecked") - public List, String>> getAll() throws Exception { + public List, String>> getAllAndLock() throws Exception { final List, String>> stateHandles = new ArrayList<>(); boolean success = false; @@ -300,7 +302,7 @@ public List, String>> getAll() throws Exception path = "/" + path; try { - final RetrievableStateHandle stateHandle = get(path); + final RetrievableStateHandle stateHandle = getAndLock(path); stateHandles.add(new Tuple2<>(stateHandle, path)); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry @@ -323,7 +325,8 @@ public List, String>> getAll() throws Exception /** - * Gets all available state handles from ZooKeeper sorted by name (ascending). + * Gets all available state handles from ZooKeeper sorted by name (ascending) and locks the + * respective state nodes. * *

If there is a concurrent modification, the operation is retried until it succeeds. * @@ -331,7 +334,7 @@ public List, String>> getAll() throws Exception * @throws Exception If a ZooKeeper or state handle operation fails */ @SuppressWarnings("unchecked") - public List, String>> getAllSortedByName() throws Exception { + public List, String>> getAllSortedByNameAndLock() throws Exception { final List, String>> stateHandles = new ArrayList<>(); boolean success = false; @@ -355,14 +358,16 @@ public List, String>> getAllSortedByName() thro path = "/" + path; try { - final RetrievableStateHandle stateHandle = get(path); + final RetrievableStateHandle stateHandle = getAndLock(path); stateHandles.add(new Tuple2<>(stateHandle, path)); } catch (KeeperException.NoNodeException ignored) { // Concurrent deletion, retry continue retry; } catch (IOException ioException) { LOG.warn("Could not get all ZooKeeper children. Node {} contained " + - "corrupted data. Ignoring this node.", path, ioException); + "corrupted data. Releasing and trying to remove this node.", path, ioException); + + releaseAndTryRemove(path); } } @@ -370,6 +375,9 @@ public List, String>> getAllSortedByName() thro // Check for concurrent modifications success = initialCVersion == finalCVersion; + + // we don't have to release all locked nodes in case of a concurrent modification, because we + // will retrieve them in the next iteration again. } } @@ -377,75 +385,306 @@ public List, String>> getAllSortedByName() thro } /** - * Removes a state handle from ZooKeeper. + * Releases the lock for the given state node and tries to remove the state node if it is no longer locked. + * The deletion of the state node is executed asynchronously. * - *

Important: this does not discard the state handle. If you want to - * discard the state handle call {@link #removeAndDiscardState(String)}. + *

Important: This also discards the stored state handle after the given action + * has been executed. * * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/') * @throws Exception If the ZooKeeper operation fails */ - public void remove(String pathInZooKeeper) throws Exception { - checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); - - client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper); + public void releaseAndTryRemove(String pathInZooKeeper) throws Exception { + releaseAndTryRemove(pathInZooKeeper, null); } /** - * Removes a state handle from ZooKeeper asynchronously. + * Releases the lock for the given state node and tries to remove the state node if it is no longer locked. + * The deletion of the state node is executed asynchronously. After the state node has been deleted, the given + * callback is called with the {@link RetrievableStateHandle} of the deleted state node. * - *

Important: this does not discard the state handle. If you want to - * discard the state handle call {@link #removeAndDiscardState(String)}. + *

Important: This also discards the stored state handle after the given action + * has been executed. * - * @param pathInZooKeeper Path of state handle to remove (expected to start with a '/') - * @param callback The callback after the operation finishes + * @param pathInZooKeeper Path of state handle to remove + * @param callback The callback to execute after a successful deletion. Null if no action needs to be executed. * @throws Exception If the ZooKeeper operation fails */ - public void remove(String pathInZooKeeper, BackgroundCallback callback) throws Exception { + public void releaseAndTryRemove( + String pathInZooKeeper, + @Nullable final RemoveCallback callback) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); - checkNotNull(callback, "Background callback"); - client.delete().deletingChildrenIfNeeded().inBackground(callback, executor).forPath(pathInZooKeeper); + final String path = normalizePath(pathInZooKeeper); + + RetrievableStateHandle stateHandle = null; + + try { + stateHandle = get(path, false); + } catch (Exception e) { + LOG.warn("Could not retrieve the state handle from node " + path + '.', e); + } + + release(pathInZooKeeper); + + final BackgroundCallback backgroundCallback = new RemoveBackgroundCallback<>(stateHandle, callback, path); + + client.delete().inBackground(backgroundCallback, executor).forPath(path); } /** - * Discards a state handle and removes it from ZooKeeper. + * Releases all lock nodes of this ZooKeeperStateHandleStores and tries to remove all state nodes which + * are not locked anymore. * - *

If you only want to remove the state handle in ZooKeeper call {@link #remove(String)}. + *

The delete operation is executed asynchronously * - * @param pathInZooKeeper Path of state handle to discard (expected to start with a '/') - * @throws Exception If the ZooKeeper or state handle operation fails + * @throws Exception if the delete operation fails */ - public void removeAndDiscardState(String pathInZooKeeper) throws Exception { + public void releaseAndTryRemoveAll() throws Exception { + Collection children = getAllPaths(); + + Exception exception = null; + + for (String child : children) { + try { + releaseAndTryRemove('/' + child); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + } + + if (exception != null) { + throw new Exception("Could not properly release and try removing all state nodes.", exception); + } + } + + /** + * Releases the lock from the node under the given ZooKeeper path. If no lock exists, then nothing happens. + * + * @param pathInZooKeeper Path describing the ZooKeeper node + * @throws Exception if the delete operation of the lock node fails + */ + public void release(String pathInZooKeeper) throws Exception { + final String path = normalizePath(pathInZooKeeper); + + try { + client.delete().forPath(getLockPath(path)); + } catch (KeeperException.NoNodeException ignored) { + // we have never locked this node + } catch (Exception e) { + throw new Exception("Could not release the lock: " + getLockPath(pathInZooKeeper) + '.', e); + } + } + + /** + * Releases all lock nodes of this ZooKeeperStateHandleStore. + * + * @throws Exception if the delete operation of a lock file fails + */ + public void releaseAll() throws Exception { + Collection children = getAllPaths(); + + Exception exception = null; + + for (String child: children) { + try { + release(child); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + } + + if (exception != null) { + throw new Exception("Could not properly release all state nodes.", exception); + } + } + + // --------------------------------------------------------------------------------------------------------- + // Protected methods + // --------------------------------------------------------------------------------------------------------- + + /** + * Returns the path for the lock node relative to the given path. + * + * @param rootPath Root path under which the lock node shall be created + * @return Path for the lock node + */ + protected String getLockPath(String rootPath) { + return rootPath + '/' + lockNode; + } + + // --------------------------------------------------------------------------------------------------------- + // Private methods + // --------------------------------------------------------------------------------------------------------- + + /** + * Gets a state handle from ZooKeeper and optionally locks it. + * + * @param pathInZooKeeper Path in ZooKeeper to get the state handle from + * @param lock True if we should lock the node; otherwise false + * @return The state handle + * @throws IOException Thrown if the method failed to deserialize the stored state handle + * @throws Exception Thrown if a ZooKeeper operation failed + */ + @SuppressWarnings("unchecked") + private RetrievableStateHandle get(String pathInZooKeeper, boolean lock) throws Exception { checkNotNull(pathInZooKeeper, "Path in ZooKeeper"); - RetrievableStateHandle stateHandle = get(pathInZooKeeper); + final String path = normalizePath(pathInZooKeeper); + + if (lock) { + // try to lock the node + try { + client.create().withMode(CreateMode.EPHEMERAL).forPath(getLockPath(path)); + } catch (KeeperException.NodeExistsException ignored) { + // we have already created the lock + } catch (KeeperException.NoNodeException e) { + throw new Exception("Cannot lock the node " + path + " since it does not exist.", e); + } + } + + boolean success = false; + + try { + byte[] data; + + try { + data = client.getData().forPath(path); + } catch (Exception e) { + throw new Exception("Failed to retrieve state handle data under " + path + + " from ZooKeeper.", e); + } + + try { + RetrievableStateHandle retrievableStateHandle = InstantiationUtil.deserializeObject( + data, + Thread.currentThread().getContextClassLoader()); - // Delete the state handle from ZooKeeper first - client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper); + success = true; - // Discard the state handle only after it has been successfully deleted from ZooKeeper. - // Otherwise we might enter an illegal state after failures (with a state handle in - // ZooKeeper, which has already been discarded). - stateHandle.discardState(); + return retrievableStateHandle; + } catch (IOException | ClassNotFoundException e) { + throw new IOException("Failed to deserialize state handle from ZooKeeper data from " + + path + '.', e); + } + } finally { + if (!success && lock) { + // release the lock + release(path); + } + } } /** - * Discards all available state handles and removes them from ZooKeeper. + * Makes sure that every path starts with a "/" * - * @throws Exception If a ZooKeeper or state handle operation fails + * @param path Path to normalize + * @return Normalized path such that it starts with a "/" */ - public void removeAndDiscardAllState() throws Exception { - final List, String>> allStateHandles = getAll(); + private static String normalizePath(String path) { + if (path.startsWith("/")) { + return path; + } else { + return '/' + path; + } + } - ZKPaths.deleteChildren( - client.getZookeeperClient().getZooKeeper(), - ZKPaths.fixForNamespace(client.getNamespace(), "/"), - false); + // --------------------------------------------------------------------------------------------------------- + // Utility classes + // --------------------------------------------------------------------------------------------------------- - // Discard the state handles only after they have been successfully deleted from ZooKeeper. - for (Tuple2, String> stateHandleAndPath : allStateHandles) { - stateHandleAndPath.f0.discardState(); + /** + * Callback which is executed when removing a node from ZooKeeper. The callback will call the given + * {@link RemoveCallback} if it is not null. Afterwards, it will discard the given {@link RetrievableStateHandle} + * if it is not null. + * + * @param Type of the value stored in the RetrievableStateHandle + */ + private static final class RemoveBackgroundCallback implements BackgroundCallback { + @Nullable + private final RetrievableStateHandle stateHandle; + + @Nullable + private final RemoveCallback callback; + + private final String pathInZooKeeper; + + private RemoveBackgroundCallback( + @Nullable RetrievableStateHandle stateHandle, + @Nullable RemoveCallback callback, + String pathInZooKeeper) { + + this.stateHandle = stateHandle; + this.callback = callback; + this.pathInZooKeeper = Preconditions.checkNotNull(pathInZooKeeper); } + + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + try { + if (event.getType() == CuratorEventType.DELETE) { + final KeeperException.Code resultCode = KeeperException.Code.get(event.getResultCode()); + + if (resultCode == KeeperException.Code.OK) { + Exception exception = null; + + if (null != callback) { + try { + callback.apply(stateHandle); + } catch (Throwable e) { + exception = new Exception("Could not execute delete action for node " + + pathInZooKeeper + '.', e); + } + } + + if (stateHandle != null) { + try { + // Discard the state handle + stateHandle.discardState(); + } catch (Throwable e) { + Exception newException = new Exception("Could not discard state handle of node " + + pathInZooKeeper + '.', e); + + if (exception == null) { + exception = newException; + } else { + exception.addSuppressed(newException); + } + } + } + + if (exception != null) { + throw exception; + } + } else if (resultCode == KeeperException.Code.NOTEMPTY) { + // Could not delete the node because it still contains children/locks + LOG.debug("Could not delete node " + pathInZooKeeper + " because it is still locked."); + } else { + throw new IllegalStateException("Unexpected result code " + + resultCode.name() + " in '" + event + "' callback."); + } + } else { + throw new IllegalStateException("Unexpected event type " + + event.getType() + " in '" + event + "' callback."); + } + } catch (Exception e) { + LOG.warn("Failed to run callback for delete operation on node " + pathInZooKeeper + '.', e); + } + + } + }; + + /** + * Callback interface for remove calls + */ + public interface RemoveCallback { + /** + * Callback method. The parameter can be null if the {@link RetrievableStateHandle} could not be retrieved + * from ZooKeeper. + * + * @param value RetrievableStateHandle retrieved from ZooKeeper, null if it was not retrievable + * @throws FlinkException If the callback failed + */ + void apply(@Nullable RetrievableStateHandle value) throws FlinkException; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 94bd12f893fb4..985c662874464 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -307,6 +308,14 @@ public void awaitDiscard() throws InterruptedException { } } + public boolean awaitDiscard(long timeout) throws InterruptedException { + if (discardLatch != null) { + return discardLatch.await(timeout, TimeUnit.MILLISECONDS); + } else { + return false; + } + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 3fd7f1b12bfd7..0d932892621e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -24,50 +24,52 @@ import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; +import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * Tests for basic {@link CompletedCheckpointStore} contract and ZooKeeper state handling. */ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest { - private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); + private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1); - private final static String CheckpointsPath = "/checkpoints"; + private static final String CHECKPOINT_PATH = "/checkpoints"; @AfterClass public static void tearDown() throws Exception { - if (ZooKeeper != null) { - ZooKeeper.shutdown(); + if (ZOOKEEPER != null) { + ZOOKEEPER.shutdown(); } } @Before public void cleanUp() throws Exception { - ZooKeeper.deleteAll(); + ZOOKEEPER.deleteAll(); } @Override - protected AbstractCompletedCheckpointStore createCompletedCheckpoints( - int maxNumberOfCheckpointsToRetain) throws Exception { - + protected ZooKeeperCompletedCheckpointStore createCompletedCheckpoints(int maxNumberOfCheckpointsToRetain) throws Exception { return new ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, - ZooKeeper.createClient(), CheckpointsPath, new RetrievableStateStorageHelper() { - @Override - public RetrievableStateHandle store(CompletedCheckpoint state) throws Exception { - return new HeapRetrievableStateHandle<>(state); - } - }, Executors.directExecutor()); + ZOOKEEPER.getClient(), + CHECKPOINT_PATH, + new HeapStateStorageHelper(), + Executors.directExecutor()); } // --------------------------------------------------------------------------------------------- @@ -95,7 +97,7 @@ public void testRecover() throws Exception { verifyCheckpointRegistered(expected[2].getOperatorStates().values(), checkpoints.sharedStateRegistry); // All three should be in ZK - assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); + assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); resetCheckpoint(expected[0].getOperatorStates().values()); @@ -105,7 +107,7 @@ public void testRecover() throws Exception { // Recover TODO!!! clear registry! checkpoints.recover(); - assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size()); + assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); assertEquals(expected[2], checkpoints.getLatestCheckpoint()); @@ -130,18 +132,18 @@ public void testRecover() throws Exception { */ @Test public void testShutdownDiscardsCheckpoints() throws Exception { - CuratorFramework client = ZooKeeper.getClient(); + CuratorFramework client = ZOOKEEPER.getClient(); CompletedCheckpointStore store = createCompletedCheckpoints(1); TestCompletedCheckpoint checkpoint = createCheckpoint(0); store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); + assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); store.shutdown(JobStatus.FINISHED); assertEquals(0, store.getNumberOfRetainedCheckpoints()); - assertNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); + assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); store.recover(); @@ -149,24 +151,30 @@ public void testShutdownDiscardsCheckpoints() throws Exception { } /** - * Tests that suspends keeps all checkpoints (as they can be recovered - * later by the ZooKeeper store). + * Tests that suspends keeps all checkpoints (so that they can be recovered + * later by the ZooKeeper store). Furthermore, suspending a job should release + * all locks. */ @Test public void testSuspendKeepsCheckpoints() throws Exception { - CuratorFramework client = ZooKeeper.getClient(); + CuratorFramework client = ZOOKEEPER.getClient(); CompletedCheckpointStore store = createCompletedCheckpoints(1); TestCompletedCheckpoint checkpoint = createCheckpoint(0); store.addCheckpoint(checkpoint); assertEquals(1, store.getNumberOfRetainedCheckpoints()); - assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); + assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); store.shutdown(JobStatus.SUSPENDED); assertEquals(0, store.getNumberOfRetainedCheckpoints()); - assertNotNull(client.checkExists().forPath(CheckpointsPath + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); + + final String checkpointPath = CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()); + Stat stat = client.checkExists().forPath(checkpointPath); + + assertNotNull("The checkpoint node should exist.", stat); + assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren()); // Recover again store.recover(); @@ -201,24 +209,91 @@ public void testLatestCheckpointRecovery() throws Exception { assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint); } + /** + * FLINK-6612 + * + * Checks that a concurrent checkpoint completion won't discard a checkpoint which has been + * recovered by a different completed checkpoint store. + */ + @Test + public void testConcurrentCheckpointOperations() throws Exception { + final int numberOfCheckpoints = 1; + final long waitingTimeout = 50L; + + ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = createCompletedCheckpoints(numberOfCheckpoints); + ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = createCompletedCheckpoints(numberOfCheckpoints); + + TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1); + + // complete the first checkpoint + zkCheckpointStore1.addCheckpoint(completedCheckpoint); + + // recover the checkpoint by a different checkpoint store + zkCheckpointStore2.recover(); + + CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(); + assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint); + TestCompletedCheckpoint recoveredTestCheckpoint = (TestCompletedCheckpoint) recoveredCheckpoint; + + // Check that the recovered checkpoint is not yet discarded + assertFalse(recoveredTestCheckpoint.isDiscarded()); + + // complete another checkpoint --> this should remove the first checkpoint from the store + // because the number of retained checkpoints == 1 + TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2); + zkCheckpointStore1.addCheckpoint(completedCheckpoint2); + + List allCheckpoints = zkCheckpointStore1.getAllCheckpoints(); + + // check that we have removed the first checkpoint from zkCompletedStore1 + assertEquals(Collections.singletonList(completedCheckpoint2), allCheckpoints); + + // lets wait a little bit to see that no discard operation will be executed + assertFalse("The checkpoint should not have been discarded.", recoveredTestCheckpoint.awaitDiscard(waitingTimeout)); + + // check that we have not discarded the first completed checkpoint + assertFalse(recoveredTestCheckpoint.isDiscarded()); + + TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3); + + // this should release the last lock on completedCheckoint and thus discard it + zkCheckpointStore2.addCheckpoint(completedCheckpoint3); + + // the checkpoint should be discarded eventually because there is no lock on it anymore + recoveredTestCheckpoint.awaitDiscard(); + } + + + static class HeapStateStorageHelper implements RetrievableStateStorageHelper { + @Override + public RetrievableStateHandle store(CompletedCheckpoint state) throws Exception { + return new HeapRetrievableStateHandle<>(state); + } + } + static class HeapRetrievableStateHandle implements RetrievableStateHandle { private static final long serialVersionUID = -268548467968932L; + private static AtomicInteger nextKey = new AtomicInteger(0); + + private static HashMap stateMap = new HashMap<>(); + + private final int key; + public HeapRetrievableStateHandle(T state) { - this.state = state; + key = nextKey.getAndIncrement(); + stateMap.put(key, state); } - private T state; - @Override public T retrieveState() throws Exception { - return state; + return (T) stateMap.get(key); } @Override public void discardState() throws Exception { - state = null; + stateMap.remove(key); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 0d22dc6b4863b..7d22d8ef2005b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -110,7 +110,7 @@ public void testCheckpointRecovery() throws Exception { ZooKeeperStateHandleStore zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock, Executors.directExecutor())); whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock); - doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByName(); + doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock(); final int numCheckpointsToRetain = 1; @@ -126,7 +126,6 @@ public void testCheckpointRecovery() throws Exception { when( client .delete() - .deletingChildrenIfNeeded() .inBackground(any(BackgroundCallback.class), any(Executor.class)) ).thenAnswer(new Answer>() { @Override @@ -150,13 +149,13 @@ public Void answer(InvocationOnMock invocation) throws Throwable { }); final String checkpointsPath = "foobar"; - final RetrievableStateStorageHelper stateSotrage = mock(RetrievableStateStorageHelper.class); + final RetrievableStateStorageHelper stateStorage = mock(RetrievableStateStorageHelper.class); ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( numCheckpointsToRetain, client, checkpointsPath, - stateSotrage, + stateStorage, Executors.directExecutor()); zooKeeperCompletedCheckpointStore.recover(); @@ -209,9 +208,9 @@ public RetrievableStateHandle answer(InvocationOnMock invoc return retrievableStateHandle; } - }).when(zookeeperStateHandleStoreMock).add(anyString(), any(CompletedCheckpoint.class)); + }).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), any(CompletedCheckpoint.class)); - doThrow(new Exception()).when(zookeeperStateHandleStoreMock).remove(anyString(), any(BackgroundCallback.class)); + doThrow(new Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(), any(ZooKeeperStateHandleStore.RemoveCallback.class)); final int numCheckpointsToRetain = 1; final String checkpointsPath = "foobar"; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java similarity index 55% rename from flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java index 4dc4c6bd8a3b5..0c215cdf5a0d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -19,14 +19,14 @@ package org.apache.flink.runtime.zookeeper; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.Before; @@ -34,7 +34,9 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -42,11 +44,12 @@ import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -63,118 +66,68 @@ *

  • Correct ordering of ZooKeeper and state handle operations
  • * */ -public class ZooKeeperStateHandleStoreITCase extends TestLogger { +public class ZooKeeperStateHandleStoreTest extends TestLogger { - private final static ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1); + private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1); @AfterClass public static void tearDown() throws Exception { - if (ZooKeeper != null) { - ZooKeeper.shutdown(); + if (ZOOKEEPER != null) { + ZOOKEEPER.shutdown(); } } @Before public void cleanUp() throws Exception { - ZooKeeper.deleteAll(); + ZOOKEEPER.deleteAll(); } /** - * Tests add operation with default {@link CreateMode}. + * Tests add operation with lock. */ @Test - public void testAdd() throws Exception { + public void testAddAndLock() throws Exception { LongStateStorage longStateStorage = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore( - ZooKeeper.getClient(), longStateStorage, Executors.directExecutor()); + ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testAdd"; final Long state = 1239712317L; // Test - store.add(pathInZooKeeper, state); + store.addAndLock(pathInZooKeeper, state); // Verify // State handle created - assertEquals(1, store.getAll().size()); - assertEquals(state, store.get(pathInZooKeeper).retrieveState()); + assertEquals(1, store.getAllAndLock().size()); + assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState()); // Path created and is persistent - Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); assertNotNull(stat); assertEquals(0, stat.getEphemeralOwner()); + List children = ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper); + + // there should be one child which is the lock + assertEquals(1, children.size()); + + stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper + '/' + children.get(0)); + assertNotNull(stat); + + // check that the child is an ephemeral node + assertNotEquals(0, stat.getEphemeralOwner()); + // Data is equal @SuppressWarnings("unchecked") Long actual = ((RetrievableStateHandle) InstantiationUtil.deserializeObject( - ZooKeeper.getClient().getData().forPath(pathInZooKeeper), + ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())).retrieveState(); assertEquals(state, actual); } - /** - * Tests that {@link CreateMode} is respected. - */ - @Test - public void testAddWithCreateMode() throws Exception { - LongStateStorage longStateStorage = new LongStateStorage(); - ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore( - ZooKeeper.getClient(), longStateStorage, Executors.directExecutor()); - - // Config - Long state = 3457347234L; - - CreateMode[] modes = CreateMode.values(); - for (int i = 0; i < modes.length; i++) { - CreateMode mode = modes[i]; - state += i; - - String pathInZooKeeper = "/testAddWithCreateMode" + mode.name(); - - // Test - store.add(pathInZooKeeper, state, mode); - - if (mode.isSequential()) { - // Figure out the sequential ID - List paths = ZooKeeper.getClient().getChildren().forPath("/"); - for (String p : paths) { - if (p.startsWith("testAddWithCreateMode" + mode.name())) { - pathInZooKeeper = "/" + p; - break; - } - } - } - - // Verify - // State handle created - assertEquals(i + 1, store.getAll().size()); - assertEquals(state, longStateStorage.getStateHandles().get(i).retrieveState()); - - // Path created - Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); - - assertNotNull(stat); - - // Is ephemeral or persistent - if (mode.isEphemeral()) { - assertTrue(stat.getEphemeralOwner() != 0); - } - else { - assertEquals(0, stat.getEphemeralOwner()); - } - - // Data is equal - @SuppressWarnings("unchecked") - Long actual = ((RetrievableStateHandle) InstantiationUtil.deserializeObject( - ZooKeeper.getClient().getData().forPath(pathInZooKeeper), - ClassLoader.getSystemClassLoader())).retrieveState(); - - assertEquals(state, actual); - } - } - /** * Tests that an existing path throws an Exception. */ @@ -183,11 +136,17 @@ public void testAddAlreadyExistingPath() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); + + ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath"); - ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath"); + store.addAndLock("/testAddAlreadyExistingPath", 1L); - store.add("/testAddAlreadyExistingPath", 1L); + // writing to the state storage should have succeeded + assertEquals(1, stateHandleProvider.getStateHandles()); + + // the created state handle should have been cleaned up if the add operation failed + assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls()); } /** @@ -198,8 +157,8 @@ public void testAddDiscardStateHandleAfterFailure() throws Exception { // Setup LongStateStorage stateHandleProvider = new LongStateStorage(); - CuratorFramework client = spy(ZooKeeper.getClient()); - when(client.create()).thenThrow(new RuntimeException("Expected test Exception.")); + CuratorFramework client = spy(ZOOKEEPER.getClient()); + when(client.inTransaction().create()).thenThrow(new RuntimeException("Expected test Exception.")); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( client, stateHandleProvider, Executors.directExecutor()); @@ -210,7 +169,7 @@ public void testAddDiscardStateHandleAfterFailure() throws Exception { try { // Test - store.add(pathInZooKeeper, state); + store.addAndLock(pathInZooKeeper, state); fail("Did not throw expected exception"); } catch (Exception ignored) { @@ -232,7 +191,7 @@ public void testReplace() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testReplace"; @@ -240,7 +199,7 @@ public void testReplace() throws Exception { final Long replaceState = 88383776661L; // Test - store.add(pathInZooKeeper, initialState); + store.addAndLock(pathInZooKeeper, initialState); store.replace(pathInZooKeeper, 0, replaceState); // Verify @@ -250,14 +209,14 @@ public void testReplace() throws Exception { assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState()); // Path created and is persistent - Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); assertNotNull(stat); assertEquals(0, stat.getEphemeralOwner()); // Data is equal @SuppressWarnings("unchecked") Long actual = ((RetrievableStateHandle) InstantiationUtil.deserializeObject( - ZooKeeper.getClient().getData().forPath(pathInZooKeeper), + ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())).retrieveState(); assertEquals(replaceState, actual); @@ -271,7 +230,7 @@ public void testReplaceNonExistingPath() throws Exception { RetrievableStateStorageHelper stateStorage = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateStorage, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor()); store.replace("/testReplaceNonExistingPath", 0, 1L); } @@ -284,7 +243,7 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception { // Setup LongStateStorage stateHandleProvider = new LongStateStorage(); - CuratorFramework client = spy(ZooKeeper.getClient()); + CuratorFramework client = spy(ZOOKEEPER.getClient()); when(client.setData()).thenThrow(new RuntimeException("Expected test Exception.")); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( @@ -296,7 +255,7 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception { final Long replaceState = 88383776661L; // Test - store.add(pathInZooKeeper, initialState); + store.addAndLock(pathInZooKeeper, initialState); try { store.replace(pathInZooKeeper, 0, replaceState); @@ -315,7 +274,7 @@ public void testReplaceDiscardStateHandleAfterFailure() throws Exception { // Initial value @SuppressWarnings("unchecked") Long actual = ((RetrievableStateHandle) InstantiationUtil.deserializeObject( - ZooKeeper.getClient().getData().forPath(pathInZooKeeper), + ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper), ClassLoader.getSystemClassLoader())).retrieveState(); assertEquals(initialState, actual); @@ -330,7 +289,7 @@ public void testGetAndExists() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testGetAndExists"; @@ -339,8 +298,8 @@ public void testGetAndExists() throws Exception { // Test assertEquals(-1, store.exists(pathInZooKeeper)); - store.add(pathInZooKeeper, state); - RetrievableStateHandle actual = store.get(pathInZooKeeper); + store.addAndLock(pathInZooKeeper, state); + RetrievableStateHandle actual = store.getAndLock(pathInZooKeeper); // Verify assertEquals(state, actual.retrieveState()); @@ -355,9 +314,9 @@ public void testGetNonExistingPath() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); - store.get("/testGetNonExistingPath"); + store.getAndLock("/testGetNonExistingPath"); } /** @@ -369,7 +328,7 @@ public void testGetAll() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testGetAll"; @@ -382,10 +341,10 @@ public void testGetAll() throws Exception { // Test for (long val : expected) { - store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); + store.addAndLock(pathInZooKeeper + val, val); } - for (Tuple2, String> val : store.getAll()) { + for (Tuple2, String> val : store.getAllAndLock()) { assertTrue(expected.remove(val.f0.retrieveState())); } assertEquals(0, expected.size()); @@ -400,22 +359,26 @@ public void testGetAllSortedByName() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); // Config - final String pathInZooKeeper = "/testGetAllSortedByName"; + final String basePath = "/testGetAllSortedByName"; final Long[] expected = new Long[] { 311222268470898L, 132812888L, 27255442L, 11122233124L }; // Test for (long val : expected) { - store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); + final String pathInZooKeeper = String.format("%s%016d", basePath, val); + store.addAndLock(pathInZooKeeper, val); } - List, String>> actual = store.getAllSortedByName(); + List, String>> actual = store.getAllSortedByNameAndLock(); assertEquals(expected.length, actual.size()); + // bring the elements in sort order + Arrays.sort(expected); + for (int i = 0; i < expected.length; i++) { assertEquals(expected[i], actual.get(i).f0.retrieveState()); } @@ -430,19 +393,19 @@ public void testRemove() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testRemove"; final Long state = 27255442L; - store.add(pathInZooKeeper, state); + store.addAndLock(pathInZooKeeper, state); // Test - store.remove(pathInZooKeeper); + store.releaseAndTryRemove(pathInZooKeeper); // Verify discarded - assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size()); } /** @@ -454,68 +417,44 @@ public void testRemoveWithCallback() throws Exception { LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testRemoveWithCallback"; final Long state = 27255442L; - store.add(pathInZooKeeper, state); + store.addAndLock(pathInZooKeeper, state); final CountDownLatch sync = new CountDownLatch(1); - BackgroundCallback callback = mock(BackgroundCallback.class); + ZooKeeperStateHandleStore.RemoveCallback callback = mock(ZooKeeperStateHandleStore.RemoveCallback.class); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { sync.countDown(); return null; } - }).when(callback).processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); + }).when(callback).apply(any(RetrievableStateHandle.class)); // Test - store.remove(pathInZooKeeper, callback); + store.releaseAndTryRemove(pathInZooKeeper, callback); // Verify discarded and callback called - assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size()); sync.await(); verify(callback, times(1)) - .processResult(eq(ZooKeeper.getClient()), any(CuratorEvent.class)); - } - - /** - * Tests that state handles are correctly discarded. - */ - @Test - public void testRemoveAndDiscardState() throws Exception { - // Setup - LongStateStorage stateHandleProvider = new LongStateStorage(); - - ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); - - // Config - final String pathInZooKeeper = "/testDiscard"; - final Long state = 27255442L; - - store.add(pathInZooKeeper, state); - - // Test - store.removeAndDiscardState(pathInZooKeeper); - - // Verify discarded - assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + .apply(any(RetrievableStateHandle.class)); } /** Tests that all state handles are correctly discarded. */ @Test - public void testRemoveAndDiscardAllState() throws Exception { + public void testReleaseAndTryRemoveAll() throws Exception { // Setup LongStateStorage stateHandleProvider = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), stateHandleProvider, Executors.directExecutor()); + ZOOKEEPER.getClient(), stateHandleProvider, Executors.directExecutor()); // Config final String pathInZooKeeper = "/testDiscardAll"; @@ -528,25 +467,25 @@ public void testRemoveAndDiscardAllState() throws Exception { // Test for (long val : expected) { - store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); + store.addAndLock(pathInZooKeeper + val, val); } - store.removeAndDiscardAllState(); + store.releaseAndTryRemoveAll(); // Verify all discarded - assertEquals(0, ZooKeeper.getClient().getChildren().forPath("/").size()); + assertEquals(0, ZOOKEEPER.getClient().getChildren().forPath("/").size()); } /** - * Tests that the ZooKeeperStateHandleStore can handle corrupted data by ignoring the respective - * ZooKeeper ZNodes. + * Tests that the ZooKeeperStateHandleStore can handle corrupted data by releasing and trying to remove the + * respective ZooKeeper ZNodes. */ @Test public void testCorruptedData() throws Exception { LongStateStorage stateStorage = new LongStateStorage(); ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore<>( - ZooKeeper.getClient(), + ZOOKEEPER.getClient(), stateStorage, Executors.directExecutor()); @@ -556,13 +495,13 @@ public void testCorruptedData() throws Exception { input.add(3L); for (Long aLong : input) { - store.add("/" + aLong, aLong); + store.addAndLock("/" + aLong, aLong); } // corrupt one of the entries - ZooKeeper.getClient().setData().forPath("/" + 2, new byte[2]); + ZOOKEEPER.getClient().setData().forPath("/" + 2, new byte[2]); - List, String>> allEntries = store.getAll(); + List, String>> allEntries = store.getAllAndLock(); Collection expected = new HashSet<>(input); expected.remove(2L); @@ -576,7 +515,7 @@ public void testCorruptedData() throws Exception { assertEquals(expected, actual); // check the same for the all sorted by name call - allEntries = store.getAllSortedByName(); + allEntries = store.getAllSortedByNameAndLock(); actual.clear(); @@ -585,6 +524,230 @@ public void testCorruptedData() throws Exception { } assertEquals(expected, actual); + + Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" + 2); + + // check that the corrupted node no longer exists + assertNull("The corrupted node should no longer exist.", stat); + } + + /** + * FLINK-6612 + * + * Tests that a concurrent delete operation cannot succeed if another instance holds a lock on the specified + * node. + */ + @Test + public void testConcurrentDeleteOperation() throws Exception { + LongStateStorage longStateStorage = new LongStateStorage(); + + ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore<>( + ZOOKEEPER.getClient(), + longStateStorage, + Executors.directExecutor()); + + ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore<>( + ZOOKEEPER.getClient(), + longStateStorage, + Executors.directExecutor()); + + final String statePath = "/state"; + + zkStore1.addAndLock(statePath, 42L); + RetrievableStateHandle stateHandle = zkStore2.getAndLock(statePath); + + // this should not remove the referenced node because we are still holding a state handle + // reference via zkStore2 + zkStore1.releaseAndTryRemove(statePath); + + // sanity check + assertEquals(42L, (long) stateHandle.retrieveState()); + + Stat nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath); + + assertNotNull("NodeStat should not be null, otherwise the referenced node does not exist.", nodeStat); + + zkStore2.releaseAndTryRemove(statePath); + + nodeStat = ZOOKEEPER.getClient().checkExists().forPath(statePath); + + assertNull("NodeState should be null, because the referenced node should no longer exist.", nodeStat); + } + + /** + * FLINK-6612 + * + * Tests that getAndLock removes a created lock if the RetrievableStateHandle cannot be retrieved + * (e.g. deserialization problem). + */ + @Test + public void testLockCleanupWhenGetAndLockFails() throws Exception { + LongStateStorage longStateStorage = new LongStateStorage(); + + ZooKeeperStateHandleStore zkStore1 = new ZooKeeperStateHandleStore<>( + ZOOKEEPER.getClient(), + longStateStorage, + Executors.directExecutor()); + + ZooKeeperStateHandleStore zkStore2 = new ZooKeeperStateHandleStore<>( + ZOOKEEPER.getClient(), + longStateStorage, + Executors.directExecutor()); + + final String path = "/state"; + + zkStore1.addAndLock(path, 42L); + + final byte[] corruptedData = {1, 2}; + + // corrupt the data + ZOOKEEPER.getClient().setData().forPath(path, corruptedData); + + try { + zkStore2.getAndLock(path); + fail("Should fail because we cannot deserialize the node's data"); + } catch (IOException ignored) { + // expected to fail + } + + // check that there is no lock node left + String lockNodePath = zkStore2.getLockPath(path); + + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(lockNodePath); + + // zkStore2 should not have created a lock node + assertNull("zkStore2 should not have created a lock node.", stat); + + Collection children = ZOOKEEPER.getClient().getChildren().forPath(path); + + // there should be exactly one lock node from zkStore1 + assertEquals(1, children.size()); + + zkStore1.releaseAndTryRemove(path); + + stat = ZOOKEEPER.getClient().checkExists().forPath(path); + + assertNull("The state node should have been removed.", stat); + } + + /** + * FLINK-6612 + * + * Tests that lock nodes will be released if the client dies. + */ + @Test + public void testLockCleanupWhenClientTimesOut() throws Exception { + LongStateStorage longStateStorage = new LongStateStorage(); + + Configuration configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOOKEEPER.getConnectString()); + configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 100); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "timeout"); + + try (CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); + CuratorFramework client2 = ZooKeeperUtils.startCuratorFramework(configuration)) { + + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>( + client, + longStateStorage, + Executors.directExecutor()); + + final String path = "/state"; + + zkStore.addAndLock(path, 42L); + + // this should delete all ephemeral nodes + client.close(); + + Stat stat = client2.checkExists().forPath(path); + + // check that our state node still exists + assertNotNull(stat); + + Collection children = client2.getChildren().forPath(path); + + // check that the lock node has been released + assertEquals(0, children.size()); + } + } + + /** + * FLINK-6612 + * + * Tests that we can release a locked state handles in the ZooKeeperStateHandleStore. + */ + @Test + public void testRelease() throws Exception { + LongStateStorage longStateStorage = new LongStateStorage(); + + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>( + ZOOKEEPER.getClient(), + longStateStorage, + Executors.directExecutor()); + + final String path = "/state"; + + zkStore.addAndLock(path, 42L); + + final String lockPath = zkStore.getLockPath(path); + + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(lockPath); + + assertNotNull("Expected an existing lock", stat); + + zkStore.release(path); + + stat = ZOOKEEPER.getClient().checkExists().forPath(path); + + // release should have removed the lock child + assertEquals("Expected no lock nodes as children", 0, stat.getNumChildren()); + + zkStore.releaseAndTryRemove(path); + + stat = ZOOKEEPER.getClient().checkExists().forPath(path); + + assertNull("State node should have been removed.",stat); + } + + /** + * FLINK-6612 + * + * Tests that we can release all locked state handles in the ZooKeeperStateHandleStore + */ + @Test + public void testReleaseAll() throws Exception { + LongStateStorage longStateStorage = new LongStateStorage(); + + ZooKeeperStateHandleStore zkStore = new ZooKeeperStateHandleStore<>( + ZOOKEEPER.getClient(), + longStateStorage, + Executors.directExecutor()); + + final Collection paths = Arrays.asList("/state1", "/state2", "/state3"); + + for (String path : paths) { + zkStore.addAndLock(path, 42L); + } + + for (String path : paths) { + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(zkStore.getLockPath(path)); + + assertNotNull("Expecte and existing lock.", stat); + } + + zkStore.releaseAll(); + + for (String path : paths) { + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(path); + + assertEquals(0, stat.getNumChildren()); + } + + zkStore.releaseAndTryRemoveAll(); + + Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/"); + + assertEquals(0, stat.getNumChildren()); } // ---------------------------------------------------------------------------------------------