diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java index 82ba6e2fc6..f7b99f8f27 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java @@ -209,10 +209,17 @@ private void processConfigData(byte[] data) throws Exception if (!properties.isEmpty()) { QuorumMaj newConfig = new QuorumMaj(properties); - String connectionString = configToConnectionString(newConfig); - if (connectionString.trim().length() > 0) + String connectionString = configToConnectionString(newConfig).trim(); + if (!connectionString.isEmpty()) { currentConfig.set(newConfig); + String oldConnectionString = ensembleProvider.getConnectionString(); + int i = oldConnectionString.indexOf('/'); + if (i >= 0) + { + String chroot = oldConnectionString.substring(i); + connectionString += chroot; + } ensembleProvider.setConnectionString(connectionString); } else diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index 13c162e3b6..082a948a1e 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -60,6 +60,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -436,6 +437,52 @@ public void testNewMembers() throws Exception } } + @Test + public void testRemoveWithChroot() throws Exception + { + // Use a long chroot path to circumvent ZOOKEEPER-4565 and ZOOKEEPER-4601 + String chroot = "/pretty-long-chroot"; + CountDownLatch ensembleLatch = new CountDownLatch(1); + + try (CuratorFramework client = newClient(cluster.getConnectString() + chroot, ensembleLatch)) { + client.start(); + client.create().forPath("/", "deadbeef".getBytes()); + + QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + assertConfig(oldConfig, cluster.getInstances()); + + CountDownLatch latch = setChangeWaiter(client); + + Collection oldInstances = cluster.getInstances(); + InstanceSpec us = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); + InstanceSpec removeSpec = oldInstances.iterator().next(); + if ( us.equals(removeSpec) ) { + Iterator iterator = oldInstances.iterator(); + iterator.next(); + removeSpec = iterator.next(); + } + + client.reconfig().leaving(Integer.toString(removeSpec.getServerId())).fromConfig(oldConfig.getVersion()).forEnsemble(); + + assertTrue(timing.awaitLatch(latch)); + + byte[] newConfigData = client.getConfig().forEnsemble(); + QuorumVerifier newConfig = toQuorumVerifier(newConfigData); + List newInstances = Lists.newArrayList(cluster.getInstances()); + newInstances.remove(removeSpec); + assertConfig(newConfig, newInstances); + + assertTrue(timing.awaitLatch(ensembleLatch)); + String connectString = EnsembleTracker.configToConnectionString(newConfig) + chroot; + assertEquals(connectString, ensembleProvider.getConnectionString()); + + client.getZookeeperClient().reset(); + client.sync().forPath("/"); + byte[] data = client.getData().forPath("/"); + assertThat(data).asString().isEqualTo("deadbeef"); + } + } + @Test public void testConfigToConnectionStringIPv4Normal() throws Exception { @@ -555,7 +602,17 @@ private CuratorFramework newClient(String connectionString) { return newClient(connectionString, true); } - private CuratorFramework newClient(String connectionString, boolean withEnsembleProvider) + private CuratorFramework newClient(String connectionString, boolean withEnsembleTracker) + { + return newClient(connectionString, withEnsembleTracker, null); + } + + private CuratorFramework newClient(String connectionString, CountDownLatch ensembleLatch) + { + return newClient(connectionString, ensembleLatch != null, ensembleLatch); + } + + private CuratorFramework newClient(String connectionString, boolean withEnsembleTracker, CountDownLatch ensembleLatch) { final AtomicReference connectString = new AtomicReference<>(connectionString); ensembleProvider = new EnsembleProvider() @@ -585,12 +642,17 @@ public void close() throws IOException @Override public void setConnectionString(String connectionString) { - connectString.set(connectionString); + if (!connectionString.equals(getConnectionString())) { + connectString.set(connectionString); + if (ensembleLatch != null) { + ensembleLatch.countDown(); + } + } } }; return CuratorFrameworkFactory.builder() .ensembleProvider(ensembleProvider) - .ensembleTracker(withEnsembleProvider) + .ensembleTracker(withEnsembleTracker) .sessionTimeoutMs(timing.session()) .connectionTimeoutMs(timing.connection()) .authorization("digest", superUserPassword.getBytes())