From 022de3921a120c6f86cc6e21442327cc04b66cd2 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 18 Aug 2016 19:34:10 +0100 Subject: [PATCH] CURATOR-344 - do watch event processing tasks in the background and limit shared value watcher to valid change events to avoid work on disconnect --- .../framework/recipes/shared/SharedValue.java | 24 +++- .../recipes/shared/TestSharedCount.java | 106 ++++++++++++++++++ 2 files changed, 127 insertions(+), 3 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java index dddc471647..1f9df379d8 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java @@ -22,6 +22,8 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.framework.listen.ListenerContainer; import org.apache.curator.framework.state.ConnectionState; @@ -30,6 +32,7 @@ import org.apache.curator.utils.ThreadUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +62,10 @@ public class SharedValue implements Closeable, SharedValueReader @Override public void process(WatchedEvent event) throws Exception { - if ( state.get() == State.STARTED ) + if ( state.get() == State.STARTED && event.getType() != Watcher.Event.EventType.None ) { - readValue(); - notifyListeners(); + // don't block event thread in possible retry + readValueAndNotifyListenersInBackground(); } } }; @@ -248,6 +251,21 @@ private void readValue() throws Exception updateValue(localStat.getVersion(), bytes); } + private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + if (event.getResultCode() == KeeperException.Code.OK.intValue()) { + updateValue(event.getStat().getVersion(), event.getData()); + notifyListeners(); + } + } + }; + + private void readValueAndNotifyListenersInBackground() throws Exception + { + client.getData().usingWatcher(watcher).inBackground(upadateAndNotifyListenerCallback).forPath(path); + } + private void notifyListeners() { final byte[] localValue = getValue(); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java index a1f4d8c7f8..7939f6ea09 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java @@ -23,7 +23,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.retry.RetryNTimes; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.Timing; @@ -40,6 +44,7 @@ import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class TestSharedCount extends BaseClassForTests { @@ -283,4 +288,105 @@ public void testMultiClientDifferentSeed() throws Exception CloseableUtils.closeQuietly(client1); } } + + + @Test + public void testDisconnectEventOnWatcherDoesNotRetry() throws Exception + { + final CountDownLatch gotSuspendEvent = new CountDownLatch(1); + + CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 1000)); + curatorFramework.start(); + curatorFramework.blockUntilConnected(); + + SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10); + sharedCount.start(); + + curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.SUSPENDED) { + gotSuspendEvent.countDown(); + } + } + }); + + try + { + server.stop(); + // if watcher goes into 10second retry loop we won't get timely notification + Assert.assertTrue(gotSuspendEvent.await(5, TimeUnit.SECONDS)); + } + finally + { + CloseableUtils.closeQuietly(sharedCount); + CloseableUtils.closeQuietly(curatorFramework); + } + } + + @Test + public void testDisconnectReconnectEventDoesNotFireValueWatcher() throws Exception + { + final CountDownLatch gotSuspendEvent = new CountDownLatch(1); + final CountDownLatch gotChangeEvent = new CountDownLatch(1); + final CountDownLatch getReconnectEvent = new CountDownLatch(1); + + final AtomicInteger numChangeEvents = new AtomicInteger(0); + + + CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryNTimes(10, 500)); + curatorFramework.start(); + curatorFramework.blockUntilConnected(); + + SharedCount sharedCount = new SharedCount(curatorFramework, "/count", 10); + + sharedCount.addListener(new SharedCountListener() { + @Override + public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { + numChangeEvents.incrementAndGet(); + gotChangeEvent.countDown(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.SUSPENDED) { + gotSuspendEvent.countDown(); + } else if (newState == ConnectionState.RECONNECTED) { + getReconnectEvent.countDown(); + } + } + }); + sharedCount.start(); + + try + { + sharedCount.setCount(11); + Assert.assertTrue(gotChangeEvent.await(2, TimeUnit.SECONDS)); + + server.stop(); + Assert.assertTrue(gotSuspendEvent.await(2, TimeUnit.SECONDS)); + + server.restart(); + Assert.assertTrue(getReconnectEvent.await(2, TimeUnit.SECONDS)); + + sharedCount.trySetCount(sharedCount.getVersionedValue(), 12); + + // flush background task queue + final CountDownLatch flushDone = new CountDownLatch(1); + curatorFramework.getData().inBackground(new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + flushDone.countDown(); + } + }).forPath("/count"); + flushDone.await(5, TimeUnit.SECONDS); + + Assert.assertEquals(2, numChangeEvents.get()); + } + finally + { + CloseableUtils.closeQuietly(sharedCount); + CloseableUtils.closeQuietly(curatorFramework); + } + } }