From adb4be47ca6e64962aae3067412cc76aa4b0cd22 Mon Sep 17 00:00:00 2001 From: randgalt Date: Mon, 18 Jan 2016 17:53:27 -0500 Subject: [PATCH] Make sure NamespaceWatcherMap is cleared when the corresponding watcher is removed via new APIs. Added tests to ensure this. --- .../org/apache/curator/utils/DebugUtils.java | 1 + .../framework/imps/NamespaceWatcherMap.java | 11 +++ .../imps/RemoveWatchesBuilderImpl.java | 10 ++- .../framework/imps/WatcherRemovalFacade.java | 11 ++- .../framework/imps/WatcherRemovalManager.java | 5 +- .../framework/imps/TestRemoveWatches.java | 67 +++++++++++-------- .../framework/recipes/cache/TreeCache.java | 2 +- .../curator/test/BaseClassForTests.java | 14 ++++ 8 files changed, 88 insertions(+), 33 deletions(-) diff --git a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java index 03f6903980..beea726a8b 100644 --- a/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java +++ b/curator-client/src/main/java/org/apache/curator/utils/DebugUtils.java @@ -25,6 +25,7 @@ public class DebugUtils public static final String PROPERTY_DONT_LOG_CONNECTION_ISSUES = "curator-dont-log-connection-problems"; public static final String PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL = "curator-log-only-first-connection-issue-as-error-level"; public static final String PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = "curator-remove-watchers-in-foreground"; + public static final String PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY = "curator-validate-namespace-watcher-map-empty"; private DebugUtils() { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java index e5aecb2aa5..00618e6327 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/NamespaceWatcherMap.java @@ -41,6 +41,11 @@ class NamespaceWatcherMap implements Closeable @Override public void close() + { + clear(); + } + + void clear() { map.clear(); } @@ -71,6 +76,12 @@ NamespaceWatcher remove(Object key) return map.remove(key); } + boolean removeWatcher(Object watcher) + { + //noinspection SuspiciousMethodCalls + return map.values().remove(watcher); + } + @VisibleForTesting boolean isEmpty() { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java index f2666e6720..c1772f1cdc 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/RemoveWatchesBuilderImpl.java @@ -231,10 +231,12 @@ private void pathInForeground(final String path) throws Exception ZooKeeper zkClient = client.getZooKeeper(); if(watcher == null) { + client.getNamespaceWatcherMap().clear(); zkClient.removeAllWatches(path, watcherType, local); } else { + client.getNamespaceWatcherMap().removeWatcher(watcher); zkClient.removeWatches(path, watcher, watcherType, local); } } @@ -252,10 +254,12 @@ public Void call() throws Exception if(watcher == null) { - zkClient.removeAllWatches(path, watcherType, local); + client.getNamespaceWatcherMap().clear(); + zkClient.removeAllWatches(path, watcherType, local); } else { + client.getNamespaceWatcherMap().removeWatcher(watcher); zkClient.removeWatches(path, watcher, watcherType, local); } } @@ -304,10 +308,12 @@ public void processResult(int rc, String path, Object ctx) ZooKeeper zkClient = client.getZooKeeper(); if(watcher == null) { - zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext()); + client.getNamespaceWatcherMap().clear(); + zkClient.removeAllWatches(operationAndData.getData(), watcherType, local, callback, operationAndData.getContext()); } else { + client.getNamespaceWatcherMap().removeWatcher(watcher); zkClient.removeWatches(operationAndData.getData(), watcher, watcherType, local, callback, operationAndData.getContext()); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java index 371fc634d1..91530b48ef 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalFacade.java @@ -28,6 +28,7 @@ import org.apache.curator.framework.api.UnhandledErrorListener; import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.DebugUtils; import org.apache.curator.utils.EnsurePath; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; @@ -41,7 +42,7 @@ class WatcherRemovalFacade extends CuratorFrameworkImpl implements WatcherRemove { super(client); this.client = client; - removalManager = new WatcherRemovalManager(client); + removalManager = new WatcherRemovalManager(client, getNamespaceWatcherMap()); } @Override @@ -65,6 +66,14 @@ public QuorumVerifier getCurrentConfig() public void removeWatchers() { removalManager.removeWatchers(); + + if ( Boolean.getBoolean(DebugUtils.PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY) ) + { + if ( !getNamespaceWatcherMap().isEmpty() ) + { + throw new RuntimeException("NamespaceWatcherMap is not empty: " + client.getNamespaceWatcherMap()); + } + } } @Override diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java index a691a948a5..064964de12 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/WatcherRemovalManager.java @@ -32,11 +32,13 @@ public class WatcherRemovalManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFrameworkImpl client; + private final NamespaceWatcherMap namespaceWatcherMap; private final Set entries = Sets.newHashSet(); // guarded by sync - WatcherRemovalManager(CuratorFrameworkImpl client) + WatcherRemovalManager(CuratorFrameworkImpl client, NamespaceWatcherMap namespaceWatcherMap) { this.client = client; + this.namespaceWatcherMap = namespaceWatcherMap; } synchronized Watcher add(String path, Watcher watcher) @@ -67,6 +69,7 @@ void removeWatchers() try { log.debug("Removing watcher for path: " + entry.path); + namespaceWatcherMap.removeWatcher(entry.watcher); RemoveWatchesBuilderImpl builder = new RemoveWatchesBuilderImpl(client); builder.internalRemoval(entry, entry.path); } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java index 4e02e95e69..a7c137aa5a 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -18,12 +18,6 @@ */ package org.apache.curator.framework.imps; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; @@ -46,6 +40,9 @@ import org.apache.zookeeper.Watcher.WatcherType; import org.testng.Assert; import org.testng.annotations.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class TestRemoveWatches extends BaseClassForTests { @@ -75,7 +72,8 @@ private boolean blockUntilDesiredConnectionState(AtomicReference childMap = children.getAndSet(null); if ( childMap != null ) diff --git a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java index da1607c972..a5afaf2485 100644 --- a/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java +++ b/curator-test/src/main/java/org/apache/curator/test/BaseClassForTests.java @@ -41,6 +41,7 @@ public class BaseClassForTests private static final int RETRY_WAIT_MS = 5000; private static final String INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES; private static final String INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND; + private static final String INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY; static { @@ -67,6 +68,17 @@ public class BaseClassForTests e.printStackTrace(); } INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND = s; + s = null; + try + { + // use reflection to avoid adding a circular dependency in the pom + s = (String)Class.forName("org.apache.curator.utils.DebugUtils").getField("PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY").get(null); + } + catch ( Exception e ) + { + e.printStackTrace(); + } + INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY = s; } @BeforeSuite(alwaysRun = true) @@ -107,6 +119,7 @@ public void setup() throws Exception System.setProperty(INTERNAL_PROPERTY_DONT_LOG_CONNECTION_ISSUES, "true"); } System.setProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND, "true"); + System.setProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY, "true"); while ( server == null ) { @@ -125,6 +138,7 @@ public void setup() throws Exception @AfterMethod public void teardown() throws Exception { + System.clearProperty(INTERNAL_PROPERTY_VALIDATE_NAMESPACE_WATCHER_MAP_EMPTY); System.clearProperty(INTERNAL_PROPERTY_REMOVE_WATCHERS_IN_FOREGROUND); if ( server != null ) {