Skip to content
Permalink
Browse files
CURATOR-623: Add ConnectionStateListener for ChildrenCache (used by Q…
…ueues) (#401)

ChildrenCache (used by Queues) didn't have a ConnectionStateListener. Thus, if a long network partition occurred the ZK instance would be recreated losing any set watcher and the ChildrenCache would fail to continue watching changes. Adding a ConnectionStateListener fixes this.
  • Loading branch information
Randgalt committed May 22, 2022
1 parent f44ff88 commit d58aa73f4cc78325abb2ae0ffd95fd9cf0776541
Showing 2 changed files with 120 additions and 12 deletions.
@@ -25,6 +25,9 @@
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.PathUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import java.io.Closeable;
@@ -33,7 +36,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.utils.PathUtils;

class ChildrenCache implements Closeable
{
@@ -49,7 +51,7 @@ public void process(WatchedEvent event) throws Exception
{
if ( !isClosed.get() )
{
sync(true);
sync();
}
}
};
@@ -66,6 +68,19 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
}
};

private final ConnectionStateListener connectionStateListener = (__, newState) -> {
if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
try
{
sync();
}
catch ( Exception e )
{
throw new RuntimeException(e);
}
}
};

static class Data
{
final List<String> children;
@@ -86,13 +101,15 @@ private Data(List<String> children, long version)

void start() throws Exception
{
sync(true);
client.getConnectionStateListenable().addListener(connectionStateListener);
sync();
}

@Override
public void close() throws IOException
{
client.removeWatchers();
client.getConnectionStateListenable().removeListener(connectionStateListener);
isClosed.set(true);
notifyFromCallback();
}
@@ -137,16 +154,9 @@ private synchronized void notifyFromCallback()
notifyAll();
}

private synchronized void sync(boolean watched) throws Exception
private synchronized void sync() throws Exception
{
if ( watched )
{
client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
}
else
{
client.getChildren().inBackground(callback).forPath(path);
}
client.getChildren().usingWatcher(watcher).inBackground(callback).forPath(path);
}

private synchronized void setNewChildren(List<String> newChildren)
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.queue;

import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.compatibility.Timing2;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class TestLongNetworkPartition {
private static final Timing2 timing = new Timing2();

// test for https://issues.apache.org/jira/browse/CURATOR-623
@Test
public void testLongNetworkPartition() throws Exception {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final TestingCluster testingCluster = started(new TestingCluster(1));
final CuratorFramework dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
final DistributedQueue<String> dyingQueue = newQueue(dyingCuratorFramework, item -> {
if ( item.equals("0") )
{
done.complete(null);
}
}))
{
dyingQueue.start();
testingCluster.killServer(testingCluster.getInstances().iterator().next());
timing.forSessionSleep().multiple(2).sleep();
testingCluster.restartServer(testingCluster.getInstances().iterator().next());
try (final CuratorFramework aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
final DistributedQueue<String> aliveQueue = newQueue(aliveCuratorFramework, null))
{
aliveQueue.start();
aliveQueue.put("0");
done.get(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
}
}
}

private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) {
curatorFramework.start();
return QueueBuilder.builder(curatorFramework, consumer == null ? null : new QueueConsumer<String>() {
@Override
public void consumeMessage(String o) {
consumer.accept(o);
}

@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}
}, new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}

@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}, "/MyChildrenCacheTest/queue").buildQueue();
}

private static TestingCluster started(TestingCluster testingCluster) throws Exception {
testingCluster.start();
return testingCluster;
}

private static CuratorFramework getCuratorFramework(String connectString) {
return CuratorFrameworkFactory.builder()
.ensembleProvider(new FixedEnsembleProvider(connectString, true))
.sessionTimeoutMs(timing.session())
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
}
}

0 comments on commit d58aa73

Please sign in to comment.