Browse files

Issue 232: ZooKeeper guarantees that "A watch object, or function/con…

…text pair, will only

be triggered once for a given notification." Curator was breaking this guarantee by internally creating a
new Watcher object each time one was needed. This is now fixed and ZooKeeper's guarantee is restored. Big
thanks to user barkbay for his persistence and help on this.
  • Loading branch information...
1 parent 342554d commit 27550024ad3dcda7c2494f633d0b60cdc2f43ad2 @Randgalt Randgalt committed Jan 29, 2013
View
7 CHANGES.txt
@@ -1,3 +1,10 @@
+1.3.2 - xxxxxxxxxxxxxxxx
+========================
+* MAJOR BUG FIX - Issue 232: ZooKeeper guarantees that "A watch object, or function/context pair, will only
+be triggered once for a given notification." Curator was breaking this guarantee by internally creating a
+new Watcher object each time one was needed. This is now fixed and ZooKeeper's guarantee is restored. Big
+thanks to user barkbay for his persistence and help on this.
+
1.3.1 - January 28, 2013
========================
* Tightened up a possible race deep inside the connection management.
View
7 curator-framework/src/main/java/com/netflix/curator/framework/imps/CuratorFrameworkImpl.java
@@ -65,6 +65,7 @@
private final CompressionProvider compressionProvider;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
+ private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
private volatile ExecutorService executorService;
@@ -250,6 +251,7 @@ public Void apply(CuratorListener listener)
unhandledErrorListeners.clear();
connectionStateManager.close();
client.close();
+ namespaceWatcherMap.close();
executorService.shutdownNow();
}
}
@@ -534,6 +536,11 @@ NamespaceFacadeCache getNamespaceFacadeCache()
return namespaceFacadeCache;
}
+ NamespaceWatcherMap getNamespaceWatcherMap()
+ {
+ return namespaceWatcherMap;
+ }
+
private <DATA_TYPE> void sendToBackgroundCallback(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
try
View
72 curator-framework/src/main/java/com/netflix/curator/framework/imps/NamespaceWatcherMap.java
@@ -0,0 +1,72 @@
+package com.netflix.curator.framework.imps;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.MapMaker;
+import com.netflix.curator.framework.api.CuratorWatcher;
+import org.apache.zookeeper.Watcher;
+import java.io.Closeable;
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentMap;
+
+class NamespaceWatcherMap implements Closeable
+{
+ private final ConcurrentMap<Object, NamespaceWatcher> map = new MapMaker()
+ .weakValues()
+ .makeMap();
+ private final CuratorFrameworkImpl client;
+
+ NamespaceWatcherMap(CuratorFrameworkImpl client)
+ {
+ this.client = client;
+ }
+
+ @Override
+ public void close()
+ {
+ map.clear();
+ }
+
+ @VisibleForTesting
+ void drain() throws Exception
+ {
+ Runtime.getRuntime().gc();
+
+ // relies on internals of MapMakerInternalMap (obviously)
+ Class mapMakerInternalMapClass = Class.forName("com.google.common.collect.MapMakerInternalMap");
+ Field drainThresholdField = mapMakerInternalMapClass.getDeclaredField("DRAIN_THRESHOLD");
+ drainThresholdField.setAccessible(true);
+ int drainThreshold = drainThresholdField.getInt(null) + 1;
+ while ( drainThreshold-- > 0 )
+ {
+ map.get(new Object());
+ }
+ }
+
+ @VisibleForTesting
+ NamespaceWatcher get(Object key)
+ {
+ return map.get(key);
+ }
+
+ @VisibleForTesting
+ boolean isEmpty()
+ {
+ return map.isEmpty();
+ }
+
+ NamespaceWatcher getNamespaceWatcher(Watcher watcher)
+ {
+ return get(watcher, new NamespaceWatcher(client, watcher));
+ }
+
+ NamespaceWatcher getNamespaceWatcher(CuratorWatcher watcher)
+ {
+ return get(watcher, new NamespaceWatcher(client, watcher));
+ }
+
+ private NamespaceWatcher get(Object watcher, NamespaceWatcher newNamespaceWatcher)
+ {
+ NamespaceWatcher existingNamespaceWatcher = map.putIfAbsent(watcher, newNamespaceWatcher);
+ return (existingNamespaceWatcher != null) ? existingNamespaceWatcher : newNamespaceWatcher;
+ }
+}
View
4 curator-framework/src/main/java/com/netflix/curator/framework/imps/Watching.java
@@ -33,13 +33,13 @@
Watching(CuratorFrameworkImpl client, Watcher watcher)
{
- this.watcher = new NamespaceWatcher(client, watcher);
+ this.watcher = client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
this.watched = false;
}
Watching(CuratorFrameworkImpl client, CuratorWatcher watcher)
{
- this.watcher = new NamespaceWatcher(client, watcher);
+ this.watcher = client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
this.watched = false;
}
View
162 curator-framework/src/test/java/com/netflix/curator/framework/imps/TestWatcherIdentity.java
@@ -0,0 +1,162 @@
+package com.netflix.curator.framework.imps;
+
+import com.google.common.io.Closeables;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.api.CuratorWatcher;
+import com.netflix.curator.retry.RetryOneTime;
+import com.netflix.curator.test.Timing;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestWatcherIdentity extends BaseClassForTests
+{
+ private static final String PATH = "/foo";
+
+ private static class CountCuratorWatcher implements CuratorWatcher
+ {
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public void process(WatchedEvent event) throws Exception
+ {
+ count.incrementAndGet();
+ }
+ }
+
+ private static class CountZKWatcher implements Watcher
+ {
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public void process(WatchedEvent event)
+ {
+ System.out.println("count=" + count);
+ count.incrementAndGet();
+ }
+ }
+
+ @Test
+ public void testRefExpiration() throws Exception
+ {
+ final int MAX_CHECKS = 10;
+
+ final CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ Assert.assertNull(client.getNamespaceWatcherMap().get(new CountCuratorWatcher()));
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ service.submit
+ (
+ new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ CountZKWatcher watcher = new CountZKWatcher();
+ client.getNamespaceWatcherMap().getNamespaceWatcher(watcher);
+ Assert.assertNotNull(client.getNamespaceWatcherMap().get(watcher));
+ latch.countDown();
+ return null;
+ }
+ }
+ );
+
+ latch.await();
+ service.shutdownNow();
+
+ Timing timing = new Timing();
+ for ( int i = 0; i < MAX_CHECKS; ++i )
+ {
+ Assert.assertTrue((i + 1) < MAX_CHECKS);
+ timing.sleepABit();
+
+ client.getNamespaceWatcherMap().drain(); // just to cause drainReferenceQueues() to get called
+ if ( client.getNamespaceWatcherMap().isEmpty() )
+ {
+ break;
+ }
+ }
+ }
+ finally
+ {
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testSimpleId()
+ {
+ CountCuratorWatcher curatorWatcher = new CountCuratorWatcher();
+ CountZKWatcher zkWatcher = new CountZKWatcher();
+ CuratorFrameworkImpl client = (CuratorFrameworkImpl)CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ try
+ {
+ Assert.assertSame(client.getNamespaceWatcherMap().getNamespaceWatcher(curatorWatcher), client.getNamespaceWatcherMap().getNamespaceWatcher(curatorWatcher));
+ Assert.assertSame(client.getNamespaceWatcherMap().getNamespaceWatcher(zkWatcher), client.getNamespaceWatcherMap().getNamespaceWatcher(zkWatcher));
+ Assert.assertNotSame(client.getNamespaceWatcherMap().getNamespaceWatcher(curatorWatcher), client.getNamespaceWatcherMap().getNamespaceWatcher(zkWatcher));
+ }
+ finally
+ {
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testCuratorWatcher() throws Exception
+ {
+ Timing timing = new Timing();
+ CountCuratorWatcher watcher = new CountCuratorWatcher();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath(PATH);
+ // Add twice the same watcher on the same path
+ client.getData().usingWatcher(watcher).forPath(PATH);
+ client.getData().usingWatcher(watcher).forPath(PATH);
+ // Ok, let's test it
+ client.setData().forPath(PATH, new byte[]{});
+ timing.sleepABit();
+ Assert.assertEquals(1, watcher.count.get());
+ }
+ finally
+ {
+ Closeables.closeQuietly(client);
+ }
+ }
+
+
+ @Test
+ public void testZKWatcher() throws Exception
+ {
+ Timing timing = new Timing();
+ CountZKWatcher watcher = new CountZKWatcher();
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().forPath(PATH);
+ // Add twice the same watcher on the same path
+ client.getData().usingWatcher(watcher).forPath(PATH);
+ client.getData().usingWatcher(watcher).forPath(PATH);
+ // Ok, let's test it
+ client.setData().forPath(PATH, new byte[]{});
+ timing.sleepABit();
+ Assert.assertEquals(1, watcher.count.get());
+ }
+ finally
+ {
+ Closeables.closeQuietly(client);
+ }
+ }
+}

0 comments on commit 2755002

Please sign in to comment.