Skip to content

Commit

Permalink
ZOOKEEPER-4471: Match removing WatcherType to standard, persistent mo…
Browse files Browse the repository at this point in the history
…des (apache#1998) (#51)

* ZOOKEEPER-4471: Match removing WatcherType to standard, persistent modes

Before ZOOKEEPER-1416, `WatcherType.Children` was used to remove
watchers attached through `ZooKeeper.getChildren`. `WatcherType.Data`
was used to remove watchers attached through `ZooKeeper.getData` and
`ZooKeeper.exists`.

ZOOKEEPER-1416 adds `AddWatchMode.PERSISTENT`. This watcher could be
completed remove using `WatcherType.Any`. But when removing through
`WatcherType.Data` or `WatcherType.Children`, part of
`AddWatchMode.PERSISTENT` will be left behind. And we get persistent
children or data watchers.

We are never claiming to support these type of watchers. So fix it.

In rare chance, we are going to support persistent data or children
watchers in future, I think we probably don't want to do such "magic"
thing in ZooKeeper. So fix it.

This is a step towards ZOOKEEPER-4472 which proposed to support
`WatcherType.Persistent` and `WatcherType.PersistentRecursive` to remove
persistent watchers.

* Refactor newly added tests in WatchManagerTest

I found it somewhat hard to follow in self-review. Add given-when-then
comments from my best hope for reviewing and maintenance.

Co-authored-by: Kezhu Wang <kezhuw@gmail.com>
  • Loading branch information
anurag-harness and kezhuw committed Aug 31, 2023
1 parent 6230355 commit 64309a0
Show file tree
Hide file tree
Showing 5 changed files with 466 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1558,16 +1558,16 @@ public boolean containsWatcher(String path, WatcherType type, Watcher watcher) {
boolean containsWatcher = false;
switch (type) {
case Children:
containsWatcher = this.childWatches.containsWatcher(path, watcher);
containsWatcher = this.childWatches.containsWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Data:
containsWatcher = this.dataWatches.containsWatcher(path, watcher);
containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Any:
if (this.childWatches.containsWatcher(path, watcher)) {
if (this.childWatches.containsWatcher(path, watcher, null)) {
containsWatcher = true;
}
if (this.dataWatches.containsWatcher(path, watcher)) {
if (this.dataWatches.containsWatcher(path, watcher, null)) {
containsWatcher = true;
}
break;
Expand All @@ -1579,16 +1579,16 @@ public boolean removeWatch(String path, WatcherType type, Watcher watcher) {
boolean removed = false;
switch (type) {
case Children:
removed = this.childWatches.removeWatcher(path, watcher);
removed = this.childWatches.removeWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Data:
removed = this.dataWatches.removeWatcher(path, watcher);
removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Any:
if (this.childWatches.removeWatcher(path, watcher)) {
if (this.childWatches.removeWatcher(path, watcher, null)) {
removed = true;
}
if (this.dataWatches.removeWatcher(path, watcher)) {
if (this.dataWatches.removeWatcher(path, watcher, null)) {
removed = true;
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.zookeeper.server.watch;

import java.io.PrintWriter;
import javax.annotation.Nullable;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;

Expand Down Expand Up @@ -60,6 +61,21 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*/
boolean containsWatcher(String path, Watcher watcher);

/**
* Checks the specified watcher exists for the given path and mode.
*
* @param path znode path
* @param watcher watcher object reference
* @param watcherMode watcher mode, null for any mode
* @return true if the watcher exists, false otherwise
*/
default boolean containsWatcher(String path, Watcher watcher, @Nullable WatcherMode watcherMode) {
if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
return containsWatcher(path, watcher);
}
throw new UnsupportedOperationException("persistent watch");
}

/**
* Removes the specified watcher for the given path.
*
Expand All @@ -70,6 +86,21 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*/
boolean removeWatcher(String path, Watcher watcher);

/**
* Removes the specified watcher for the given path and mode.
*
* @param path znode path
* @param watcher watcher object reference
* @param watcherMode watcher mode, null to remove all modes
* @return true if the watcher successfully removed, false otherwise
*/
default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
return removeWatcher(path, watcher);
}
throw new UnsupportedOperationException("persistent watch");
}

/**
* The entry to remove the watcher when the cnxn is closed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,36 +249,69 @@ public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {

@Override
public synchronized boolean containsWatcher(String path, Watcher watcher) {
Set<Watcher> list = watchTable.get(path);
return list != null && list.contains(watcher);
return containsWatcher(path, watcher, null);
}

@Override
public synchronized boolean removeWatcher(String path, Watcher watcher) {
public synchronized boolean containsWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
Map<String, WatchStats> paths = watch2Paths.get(watcher);
if (paths == null) {
return false;
}
WatchStats stats = paths.get(path);
return stats != null && (watcherMode == null || stats.hasMode(watcherMode));
}

private WatchStats unwatch(String path, Watcher watcher, Map<String, WatchStats> paths, Set<Watcher> watchers) {
WatchStats stats = paths.remove(path);
if (stats == null) {
return false;
return WatchStats.NONE;
}
if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
--recursiveWatchQty;
if (paths.isEmpty()) {
watch2Paths.remove(watcher);
}
watchers.remove(watcher);
if (watchers.isEmpty()) {
watchTable.remove(path);
}
return stats;
}

Set<Watcher> list = watchTable.get(path);
if (list == null || !list.remove(watcher)) {
LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher);
@Override
public synchronized boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
Map<String, WatchStats> paths = watch2Paths.get(watcher);
Set<Watcher> watchers = watchTable.get(path);
if (paths == null || watchers == null) {
return false;
}

if (list.isEmpty()) {
watchTable.remove(path);
WatchStats oldStats;
WatchStats newStats;
if (watcherMode != null) {
oldStats = paths.getOrDefault(path, WatchStats.NONE);
newStats = oldStats.removeMode(watcherMode);
if (newStats != WatchStats.NONE) {
if (newStats != oldStats) {
paths.put(path, newStats);
}
} else if (oldStats != WatchStats.NONE) {
unwatch(path, watcher, paths, watchers);
}
} else {
oldStats = unwatch(path, watcher, paths, watchers);
newStats = WatchStats.NONE;
}

return true;
if (oldStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE) && !newStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
--recursiveWatchQty;
}

return oldStats != newStats;
}

@Override
public synchronized boolean removeWatcher(String path, Watcher watcher) {
return removeWatcher(path, watcher, null);
}

// VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -847,11 +850,35 @@ public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception {
LOG.info("Adding data watcher {} on path {}", w2, "/node1");
assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");

BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync);
assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");

assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");

zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.setData("/node1/child", new byte[0], -1);
zk1.delete("/node1/child", -1);
zk1.setData("/node1", new byte[0], -1);
zk1.delete("/node1", -1);

assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");

assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1/child");
assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1/child");
assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1");

assertEquals(2, dWatchCount.getCount(), "Received watch notification after removal!");
}

/**
Expand Down Expand Up @@ -895,11 +922,27 @@ public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception
LOG.info("Adding child watcher {} on path {}", w2, "/node1");
assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");

BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);

assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync);
assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher");

assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");

zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.setData("/node1/child", new byte[0], -1);
zk1.delete("/node1/child", -1);
zk1.setData("/node1", new byte[0], -1);
zk1.delete("/node1", -1);

assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");

assertEquals(2, cWatchCount.getCount(), "Received watch notification after removal!");
}

/**
Expand Down Expand Up @@ -953,10 +996,26 @@ public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception {
LOG.info("Adding data watcher {} on path {}", w2, "/node1");
assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");

BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is not a watcher");
assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync);
assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is still a watcher after removal");
assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");

assertEvent(persistentEvents, EventType.PersistentWatchRemoved, "/node1");
assertEvent(recursiveEvents, EventType.PersistentWatchRemoved, "/node1");

zk1.delete("/node1", -1);
assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
assertNull(recursiveEvents.poll(10, TimeUnit.MILLISECONDS));
assertEquals(2, watchCount.getCount(), "Received watch notification after removal!");
}

Expand Down Expand Up @@ -1090,4 +1149,14 @@ private boolean isServerSessionWatcher(long sessionId, String path, WatcherType
return false;
}

/**
* Asserts next event from queue has given event type and path.
*/
private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
throws InterruptedException {
WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
assertNotNull(event);
assertEquals(eventType, event.getType());
assertEquals(path, event.getPath());
}
}
Loading

0 comments on commit 64309a0

Please sign in to comment.