From d58aa73f4cc78325abb2ae0ffd95fd9cf0776541 Mon Sep 17 00:00:00 2001 From: Jordan Zimmerman Date: Sat, 21 May 2022 20:58:58 -0500 Subject: [PATCH] CURATOR-623: Add ConnectionStateListener for ChildrenCache (used by Queues) (#401) ChildrenCache (used by Queues) didn't have a ConnectionStateListener. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this. --- .../recipes/queue/ChildrenCache.java | 34 ++++--- .../queue/TestLongNetworkPartition.java | 98 +++++++++++++++++++ 2 files changed, 120 insertions(+), 12 deletions(-) create mode 100644 curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java index e5c7e8cf86..a28a1cc86d 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ChildrenCache.java @@ -25,6 +25,9 @@ import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.PathUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import java.io.Closeable; @@ -33,7 +36,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.curator.utils.PathUtils; class ChildrenCache implements Closeable { @@ -49,7 +51,7 @@ public void process(WatchedEvent event) throws Exception { if ( !isClosed.get() ) { - sync(true); + sync(); } } }; @@ -66,6 +68,19 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex } }; + private final ConnectionStateListener connectionStateListener = (__, newState) -> { + if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) { + try + { + sync(); + } + catch ( Exception e ) + { + throw new RuntimeException(e); + } + } + }; + static class Data { final List children; @@ -86,13 +101,15 @@ private Data(List children, long version) void start() throws Exception { - sync(true); + client.getConnectionStateListenable().addListener(connectionStateListener); + sync(); } @Override public void close() throws IOException { client.removeWatchers(); + client.getConnectionStateListenable().removeListener(connectionStateListener); isClosed.set(true); notifyFromCallback(); } @@ -137,16 +154,9 @@ private synchronized void notifyFromCallback() notifyAll(); } - private synchronized void sync(boolean watched) throws Exception + private synchronized void sync() throws Exception { - if ( watched ) - { - client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path); - } - else - { - client.getChildren().inBackground(callback).forPath(path); - } + client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path); } private synchronized void setNewChildren(List newChildren) diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java new file mode 100644 index 0000000000..2e3c7a5bc4 --- /dev/null +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestLongNetworkPartition.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.recipes.queue; + +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.compatibility.Timing2; +import org.junit.jupiter.api.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class TestLongNetworkPartition { + private static final Timing2 timing = new Timing2(); + + // test for https://issues.apache.org/jira/browse/CURATOR-623 + @Test + public void testLongNetworkPartition() throws Exception { + final CompletableFuture done = new CompletableFuture<>(); + try (final TestingCluster testingCluster = started(new TestingCluster(1)); + final CuratorFramework dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString()); + final DistributedQueue dyingQueue = newQueue(dyingCuratorFramework, item -> { + if ( item.equals("0") ) + { + done.complete(null); + } + })) + { + dyingQueue.start(); + testingCluster.killServer(testingCluster.getInstances().iterator().next()); + timing.forSessionSleep().multiple(2).sleep(); + testingCluster.restartServer(testingCluster.getInstances().iterator().next()); + try (final CuratorFramework aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString()); + final DistributedQueue aliveQueue = newQueue(aliveCuratorFramework, null)) + { + aliveQueue.start(); + aliveQueue.put("0"); + done.get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS); + } + } + } + + private static DistributedQueue newQueue(CuratorFramework curatorFramework, Consumer consumer) { + curatorFramework.start(); + return QueueBuilder.builder(curatorFramework, consumer == null ? null : new QueueConsumer() { + @Override + public void consumeMessage(String o) { + consumer.accept(o); + } + + @Override + public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { + } + }, new QueueSerializer() { + @Override + public byte[] serialize(String item) { + return item.getBytes(); + } + + @Override + public String deserialize(byte[] bytes) { + return new String(bytes); + } + }, "/MyChildrenCacheTest/queue").buildQueue(); + } + + private static TestingCluster started(TestingCluster testingCluster) throws Exception { + testingCluster.start(); + return testingCluster; + } + + private static CuratorFramework getCuratorFramework(String connectString) { + return CuratorFrameworkFactory.builder() + .ensembleProvider(new FixedEnsembleProvider(connectString, true)) + .sessionTimeoutMs(timing.session()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + } +} \ No newline at end of file